|
| 1 | +# -------------------------------------------------------------------------------------------- |
| 2 | +# Copyright (c) Microsoft Corporation. All rights reserved. |
| 3 | +# Licensed under the MIT License. See License.txt in the project root for license information. |
| 4 | +# -------------------------------------------------------------------------------------------- |
| 5 | + |
| 6 | +""" |
| 7 | +This sample demonstrates how to use the Microsoft Azure Event Hubs Client for Python async API to |
| 8 | +read messages sent from a device. Please see the documentation for @azure/event-hubs package |
| 9 | +for more details at https://pypi.org/project/azure-eventhub/ |
| 10 | +
|
| 11 | +For an example that uses checkpointing, follow up this sample with the sample in the |
| 12 | +azure-eventhub-checkpointstoreblob package on GitHub at the following link: |
| 13 | +
|
| 14 | +https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/samples/receive_events_using_checkpoint_store_async.py |
| 15 | +""" |
| 16 | + |
| 17 | + |
| 18 | +import asyncio |
| 19 | +from azure.eventhub import TransportType |
| 20 | +from azure.eventhub.aio import EventHubConsumerClient |
| 21 | + |
| 22 | + |
| 23 | +# Event Hub-compatible endpoint |
| 24 | +# az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name} |
| 25 | +EVENTHUB_COMPATIBLE_ENDPOINT = "{your Event Hubs compatible endpoint}" |
| 26 | + |
| 27 | +# Event Hub-compatible name |
| 28 | +# az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name} |
| 29 | +EVENTHUB_COMPATIBLE_PATH = "{your Event Hubs compatible name}" |
| 30 | + |
| 31 | +# Primary key for the "service" policy to read messages |
| 32 | +# az iot hub policy show --name service --query primaryKey --hub-name {your IoT Hub name} |
| 33 | +IOTHUB_SAS_KEY = "{your service primary key}" |
| 34 | + |
| 35 | +# If you have access to the Event Hub-compatible connection string from the Azure portal, then |
| 36 | +# you can skip the Azure CLI commands above, and assign the connection string directly here. |
| 37 | +CONNECTION_STR = f'Endpoint={EVENTHUB_COMPATIBLE_ENDPOINT}/;SharedAccessKeyName=service;SharedAccessKey={IOTHUB_SAS_KEY};EntityPath={EVENTHUB_COMPATIBLE_PATH}' |
| 38 | + |
| 39 | +# Define callbacks to process events |
| 40 | +async def on_event_batch(partition_context, events): |
| 41 | + for event in events: |
| 42 | + print("Received event from partition: {}.".format(partition_context.partition_id)) |
| 43 | + print("Telemetry received: ", event.body_as_str()) |
| 44 | + print("Properties (set by device): ", event.properties) |
| 45 | + print("System properties (set by IoT Hub): ", event.system_properties) |
| 46 | + print() |
| 47 | + await partition_context.update_checkpoint() |
| 48 | + |
| 49 | +async def on_error(partition_context, error): |
| 50 | + # Put your code here. partition_context can be None in the on_error callback. |
| 51 | + if partition_context: |
| 52 | + print("An exception: {} occurred during receiving from Partition: {}.".format( |
| 53 | + partition_context.partition_id, |
| 54 | + error |
| 55 | + )) |
| 56 | + else: |
| 57 | + print("An exception: {} occurred during the load balance process.".format(error)) |
| 58 | + |
| 59 | + |
| 60 | +def main(): |
| 61 | + loop = asyncio.get_event_loop() |
| 62 | + client = EventHubConsumerClient.from_connection_string( |
| 63 | + conn_str=CONNECTION_STR, |
| 64 | + consumer_group="$default", |
| 65 | + # transport_type=TransportType.AmqpOverWebsocket, # uncomment it if you want to use web socket |
| 66 | + # http_proxy={ # uncomment if you want to use proxy |
| 67 | + # 'proxy_hostname': '127.0.0.1', # proxy hostname. |
| 68 | + # 'proxy_port': 3128, # proxy port. |
| 69 | + # 'username': '<proxy user name>', |
| 70 | + # 'password': '<proxy password>' |
| 71 | + # } |
| 72 | + ) |
| 73 | + try: |
| 74 | + loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error)) |
| 75 | + except KeyboardInterrupt: |
| 76 | + print("Receiving has stopped.") |
| 77 | + finally: |
| 78 | + loop.run_until_complete(client.close()) |
| 79 | + loop.stop() |
| 80 | + |
| 81 | + |
| 82 | +if __name__ == '__main__': |
| 83 | + main() |
0 commit comments