#You need to install the following libraries#pip install azure-servicebus#pip install aiohttp#pip install azure-identityimport asynciofrom azure.servicebus.aio import ServiceBusClientfrom azure.servicebus import ServiceBusMessage# ConstantsNAMESPACE_CONNECTION_STR ="<PrimaryConnectionString>"TOPIC_NAME ="<TOPIC_NAME>"# Function to send a single message to a Service Bus topicasyncdefsend_individual_message(publisher):# Prepare a single message with updated contentsingle_message =ServiceBusMessage("Hacktricks-Training: Single Item")# Send the message to the topicawait publisher.send_messages(single_message)print("Sent a single message containing 'Hacktricks-Training'")# Function to send multiple messages to a Service Bus topicasyncdefsend_multiple_messages(publisher):# Generate a collection of messages with updated contentmessage_list = [ServiceBusMessage(f"Hacktricks-Training: Item {i+1} in list")for i inrange(5)]# Send the entire collection of messages to the topicawait publisher.send_messages(message_list)print("Sent a list of 5 messages containing 'Hacktricks-Training'")# Function to send a grouped batch of messages to a Service Bus topicasyncdefsend_grouped_messages(publisher):# Send a grouped batch of messages with updated contentasyncwith publisher:grouped_message_batch =await publisher.create_message_batch()for i inrange(10):try:# Append a message to the batch with updated contentgrouped_message_batch.add_message(ServiceBusMessage(f"Hacktricks-Training: Item {i+1}"))exceptValueError:# If batch reaches its size limit, handle by creating another batchbreak# Dispatch the batch of messages to the topicawait publisher.send_messages(grouped_message_batch)print("Sent a batch of 10 messages containing 'Hacktricks-Training'")# Main function to execute all tasksasyncdefexecute():# Instantiate the Service Bus client with the connection stringasyncwith ServiceBusClient.from_connection_string(conn_str=NAMESPACE_CONNECTION_STR,logging_enable=True)as sb_client:# Create a topic sender for dispatching messages to the topicpublisher = sb_client.get_topic_sender(topic_name=TOPIC_NAME)asyncwith publisher:# Send a single messageawaitsend_individual_message(publisher)# Send multiple messagesawaitsend_multiple_messages(publisher)# Send a batch of messagesawaitsend_grouped_messages(publisher)# Run the asynchronous executionasyncio.run(execute())print("Messages Sent")print("----------------------------")
#You need to install the following libraries#pip install azure-servicebus#pip install aiohttp#pip install azure-identityimport asynciofrom azure.servicebus.aio import ServiceBusClientNAMESPACE_CONNECTION_STR ="<PrimaryConnectionString>"TOPIC_NAME ="<TOPIC_NAME>"SUBSCRIPTION_NAME ="<TOPIC_SUBSCRIPTION_NAME>"#Topic Subscription# Function to receive and process messages from a Service Bus subscriptionasyncdefreceive_and_process_messages():# Create a Service Bus client using the connection stringasyncwith ServiceBusClient.from_connection_string(conn_str=NAMESPACE_CONNECTION_STR,logging_enable=True)as servicebus_client:# Get the Subscription Receiver object for the specified topic and subscriptionreceiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME,subscription_name=SUBSCRIPTION_NAME,max_wait_time=5)asyncwith receiver:# Receive messages with a defined maximum wait time and countreceived_msgs =await receiver.receive_messages(max_wait_time=5,max_message_count=20)for msg in received_msgs:print("Received: "+str(msg))# Complete the message to remove it from the subscriptionawait receiver.complete_message(msg)# Run the asynchronous message processing functionasyncio.run(receive_and_process_messages())print("Message Receiving Completed")print("----------------------------")