44
55# For guidance, see https://docs.microsoft.com/azure/iot-edge/tutorial-python-module
66
7- import os
8- import random
9- import time
107import sys
11- import iothub_client
12- from iothub_client import IoTHubClient , IoTHubClientError , IoTHubTransportProvider
13- from iothub_client import IoTHubMessage , IoTHubMessageDispositionResult , IoTHubError
14-
15- # messageTimeout - the maximum time in milliseconds until a message times out.
16- # The timeout period starts at IoTHubClient.send_event_async.
17- # By default, messages do not expire.
18- MESSAGE_TIMEOUT = 10000
8+ import time
9+ import threading
10+ from azure .iot .device import IoTHubModuleClient , Message
1911
2012# global counters
21- RECEIVE_CALLBACKS = 0
22- SEND_CALLBACKS = 0
23-
24- # Choose HTTP, AMQP or MQTT as transport protocol. Currently only MQTT is supported.
25- PROTOCOL = IoTHubTransportProvider .MQTT
26-
27- # String containing Hostname, Device Id & Device Key & Module Id in the format:
28- # "HostName=<host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>;ModuleId=<module_id>;GatewayHostName=<gateway>"
29- CONNECTION_STRING = "[Device Connection String]"
30-
31- # Callback received when the message that we're forwarding is processed.
32- def send_confirmation_callback (message , result , user_context ):
33- global SEND_CALLBACKS
34- print ( "Confirmation[%d] received for message with result = %s" % (user_context , result ) )
35- map_properties = message .properties ()
36- key_value_pair = map_properties .get_internals ()
37- print ( " Properties: %s" % key_value_pair )
38- SEND_CALLBACKS += 1
39- print ( " Total calls confirmed: %d" % SEND_CALLBACKS )
40-
41-
42- # receive_message_callback is invoked when an incoming message arrives on the specified
43- # input queue (in the case of this sample, "input1"). Because this is a filter module,
44- # we will forward this message onto the "output1" queue.
45- def receive_message_callback (message , hubManager ):
46- global RECEIVE_CALLBACKS
47- message_buffer = message .get_bytearray ()
48- size = len (message_buffer )
49- print ( " Data: <<<%s>>> & Size=%d" % (message_buffer [:size ].decode ('utf-8' ), size ) )
50- map_properties = message .properties ()
51- key_value_pair = map_properties .get_internals ()
52- print ( " Properties: %s" % key_value_pair )
53- RECEIVE_CALLBACKS += 1
54- print ( " Total calls received: %d" % RECEIVE_CALLBACKS )
55- hubManager .forward_event_to_output ("output1" , message , 0 )
56- return IoTHubMessageDispositionResult .ACCEPTED
57-
58-
59- class HubManager (object ):
60-
61- def __init__ (
62- self ,
63- connection_string ):
64- self .client_protocol = PROTOCOL
65- self .client = IoTHubClient (connection_string , PROTOCOL )
66-
67- # set the time until a message times out
68- self .client .set_option ("messageTimeout" , MESSAGE_TIMEOUT )
69- # some embedded platforms need certificate information
70- self .set_certificates ()
71-
72- # sets the callback when a message arrives on "input1" queue. Messages sent to
73- # other inputs or to the default will be silently discarded.
74- self .client .set_message_callback ("input1" , receive_message_callback , self )
75-
76- def set_certificates (self ):
77- isWindows = sys .platform .lower () in ['windows' , 'win32' ]
78- if not isWindows :
79- CERT_FILE = os .environ ['EdgeModuleCACertificateFile' ]
80- print ("Adding TrustedCerts from: {0}" .format (CERT_FILE ))
81-
82- # this brings in x509 privateKey and certificate
83- file = open (CERT_FILE )
84- try :
85- self .client .set_option ("TrustedCerts" , file .read ())
86- print ( "set_option TrustedCerts successful" )
87- except IoTHubClientError as iothub_client_error :
88- print ( "set_option TrustedCerts failed (%s)" % iothub_client_error )
89-
90- file .close ()
91-
92- # Forwards the message received onto the next stage in the process.
93- def forward_event_to_output (self , outputQueueName , event , send_context ):
94- self .client .send_event_async (
95- outputQueueName , event , send_confirmation_callback , send_context )
96-
97- def main (connection_string ):
13+ RECEIVED_MESSAGES = 0
14+
15+ def receive_message_listener (client ):
16+ # This listener function only triggers for messages sent to "input1".
17+ # Messages sent to other inputs or to the default will be silently discarded.
18+ global RECEIVED_MESSAGES
19+ while True :
20+ message = client .receive_message_on_input ("input1" ) # blocking call
21+ RECEIVED_MESSAGES += 1
22+ print ("Message received on input1" )
23+ print ( " Data: <<{}>>" .format (message .data ) )
24+ print ( " Properties: {}" .format (message .custom_properties ))
25+ print ( " Total calls received: {}" .format (RECEIVED_MESSAGES ))
26+ print ("Forwarding message to output1" )
27+ client .send_message_to_output (message , "output1" )
28+ print ("Message successfully forwarded" )
29+
30+ def main ():
9831 try :
99- print ( "\n Python %s \n " % sys .version )
32+ print ( "\n Python {} \n " . format ( sys .version ) )
10033 print ( "IoT Hub Client for Python" )
10134
102- hub_manager = HubManager (connection_string )
35+ client = IoTHubModuleClient .create_from_edge_environment ()
36+
37+ # Begin listening for messages
38+ message_listener_thread = threading .Thread (target = receive_message_listener , args = (client ,))
39+ message_listener_thread .daemon = True
40+ message_listener_thread .start ()
10341
104- print ( "Starting the IoT Hub Python sample using protocol %s ..." % hub_manager . client_protocol )
42+ print ( "Starting the IoT Hub Python sample..." )
10543 print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. " )
10644
10745 while True :
10846 time .sleep (1000 )
10947
110- except IoTHubError as iothub_error :
111- print ( "Unexpected error %s from IoTHub" % iothub_error )
112- return
11348 except KeyboardInterrupt :
11449 print ( "IoTHubClient sample stopped" )
50+ except :
51+ print ( "Unexpected error from IoTHub" )
52+ return
11553
11654if __name__ == '__main__' :
11755 try :
118- CONNECTION_STRING = os . environ [ 'EdgeHubConnectionString' ]
56+ main ()
11957
12058 except Exception as error :
12159 print ( error )
12260 sys .exit (1 )
123-
124- main (CONNECTION_STRING )
0 commit comments