@@ -84,7 +84,6 @@ def __init__(self, config_path: Union[str, Path] = "node.json") -> None:
8484 self ._last_nodeinfo_sent_monotonic = 0.0
8585 self ._last_position_reply_monotonic = 0.0
8686 self ._last_nodeinfo_seen : Dict [int , int ] = {}
87- self ._receive_wrappers : Dict [Callable [..., Any ], Callable [..., None ]] = {}
8887 self ._response_handlers : Dict [int , tuple [Callable [[Dict [str , Any ]], Any ], bool ]] = {}
8988 self .nodesByNum : Dict [int , Dict [str , Any ]] = {}
9089 self .nodes : Dict [str , Dict [str , Any ]] = {}
@@ -151,8 +150,6 @@ def start(self) -> None:
151150 return
152151 pub .subscribe (self ._handle_raw_packet , "mesh.rx.packet" )
153152 pub .subscribe (self ._handle_unique_packet , self .PACKET_TOPIC )
154- pub .subscribe (self ._handle_receive_bridge , self .RECEIVE_TOPIC )
155- pub .subscribe (self ._handle_node_update_bridge , self .PACKET_TOPIC )
156153 self .stream = UDPPacketStream (
157154 self .config .udp .mcast_group ,
158155 int (self .config .udp .mcast_port ),
@@ -188,14 +185,6 @@ def stop(self) -> None:
188185 pub .unsubscribe (self ._handle_unique_packet , self .PACKET_TOPIC )
189186 except KeyError :
190187 pass
191- try :
192- pub .unsubscribe (self ._handle_receive_bridge , self .RECEIVE_TOPIC )
193- except KeyError :
194- pass
195- try :
196- pub .unsubscribe (self ._handle_node_update_bridge , self .PACKET_TOPIC )
197- except KeyError :
198- pass
199188 if self ._broadcast_thread and self ._broadcast_thread .is_alive ():
200189 self ._broadcast_thread .join (timeout = 2.0 )
201190 if was_connected :
@@ -211,23 +200,11 @@ def run_forever(self) -> None:
211200 self .stop ()
212201
213202 def receive (self , callback : Callable [..., Any ]) -> None :
214- if callback in self ._receive_wrappers :
215- return
216-
217- def wrapper (packet : mesh_pb2 .MeshPacket , addr : Any = None , ** kwargs : Any ) -> None :
218- del addr
219- del kwargs
220- callback (self ._packet_to_receive_dict (packet ), self )
221-
222- self ._receive_wrappers [callback ] = wrapper
223- pub .subscribe (wrapper , self .RECEIVE_TOPIC )
203+ pub .subscribe (callback , "meshtastic.receive" )
224204
225205 def unreceive (self , callback : Callable [..., Any ]) -> None :
226- wrapper = self ._receive_wrappers .pop (callback , None )
227- if wrapper is None :
228- return
229206 try :
230- pub .unsubscribe (wrapper , self . RECEIVE_TOPIC )
207+ pub .unsubscribe (callback , "meshtastic.receive" )
231208 except KeyError :
232209 pass
233210
@@ -614,19 +591,18 @@ def _send_packet(
614591 self ._persist_outbound_packet (packet , data )
615592 return packet
616593
617- def _handle_raw_packet (self , packet : mesh_pb2 .MeshPacket , addr : Any = None , ** kwargs : Any ) -> None :
618- del addr
594+ def _handle_raw_packet (self , packet : mesh_pb2 .MeshPacket , ** kwargs : Any ) -> None :
619595 del kwargs
620596 if not getattr (packet , "rx_time" , 0 ):
621597 packet .rx_time = int (time .time ())
622598
623599 if not packet .HasField ("decoded" ):
624600 self ._try_decode_pki (packet )
601+ self ._publish_meshtastic_receive (packet )
625602 self ._maybe_send_ack (packet )
626603 self ._maybe_send_response (packet )
627604
628- def _handle_unique_packet (self , packet : mesh_pb2 .MeshPacket , addr : Any = None , ** kwargs : Any ) -> None :
629- del addr
605+ def _handle_unique_packet (self , packet : mesh_pb2 .MeshPacket , ** kwargs : Any ) -> None :
630606 del kwargs
631607 if not getattr (packet , "rx_time" , 0 ):
632608 packet .rx_time = int (time .time ())
@@ -636,22 +612,7 @@ def _handle_unique_packet(self, packet: mesh_pb2.MeshPacket, addr: Any = None, *
636612 return
637613
638614 self ._persist_packet (packet )
639-
640- def _handle_receive_bridge (self , packet : mesh_pb2 .MeshPacket , addr : Any = None , ** kwargs : Any ) -> None :
641- del addr
642- del kwargs
643- try :
644- self ._publish_meshtastic_receive (packet )
645- except Exception as exc :
646- self ._publish_log_line (f"meshtastic.receive bridge error: { exc } " )
647-
648- def _handle_node_update_bridge (self , packet : mesh_pb2 .MeshPacket , addr : Any = None , ** kwargs : Any ) -> None :
649- del addr
650- del kwargs
651- try :
652- self ._update_node_cache_from_packet (packet )
653- except Exception as exc :
654- self ._publish_log_line (f"meshtastic.node.updated bridge error: { exc } " )
615+ self ._update_node_cache_from_packet (packet )
655616
656617 def _packet_to_receive_dict (self , packet : mesh_pb2 .MeshPacket ) -> Dict [str , Any ]:
657618 as_dict = MessageToDict (packet )
0 commit comments