From 584c0d5e77f7dd7211507fd4d808cb5df2356175 Mon Sep 17 00:00:00 2001 From: Fatima Zahra Chriha Date: Tue, 22 Feb 2022 16:07:39 -0500 Subject: [PATCH 1/6] network socket initial commit --- labgraph/devices/protocols/socket/__init__.py | 9 +++++++++ labgraph/devices/protocols/socket/socket_message.py | 9 +++++++++ 2 files changed, 18 insertions(+) create mode 100644 labgraph/devices/protocols/socket/__init__.py create mode 100644 labgraph/devices/protocols/socket/socket_message.py diff --git a/labgraph/devices/protocols/socket/__init__.py b/labgraph/devices/protocols/socket/__init__.py new file mode 100644 index 000000000..3a01ba008 --- /dev/null +++ b/labgraph/devices/protocols/socket/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + + +__all__ = [ + "SOCKETMessage", +] + +from .socket_message import SOCKETMessage diff --git a/labgraph/devices/protocols/socket/socket_message.py b/labgraph/devices/protocols/socket/socket_message.py new file mode 100644 index 000000000..b3c9e32d8 --- /dev/null +++ b/labgraph/devices/protocols/socket/socket_message.py @@ -0,0 +1,9 @@ +from labgraph.messages import Message + + +class SOCKETMessage(Message): + """ + A message representing data that was/will be communicate to SOCKET + """ + + data: bytes From cae19dfd03d1e0120a213322b972fb37dd7af549 Mon Sep 17 00:00:00 2001 From: Fatima Zahra Chriha Date: Tue, 22 Feb 2022 16:59:22 -0500 Subject: [PATCH 2/6] adding server class --- .../protocols/socket/socket_poller_node.py | 32 +++++++++++++++++++ .../socket/tests/test_socket_poller.py | 7 ++++ .../socket/tests/test_socket_sender.py | 14 ++++++++ 3 files changed, 53 insertions(+) create mode 100644 labgraph/devices/protocols/socket/socket_poller_node.py create mode 100644 labgraph/devices/protocols/socket/tests/test_socket_poller.py create mode 100644 labgraph/devices/protocols/socket/tests/test_socket_sender.py diff --git a/labgraph/devices/protocols/socket/socket_poller_node.py b/labgraph/devices/protocols/socket/socket_poller_node.py new file mode 100644 index 000000000..728f923f4 --- /dev/null +++ b/labgraph/devices/protocols/socket/socket_poller_node.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + +import socket + +from socket_message import SOCKETMessage +from labgraph.graphs import Node + + +class SOCKETPollerNode(): + """ + Represents a node in the graph which recieves data from SOCKET. + Data polled from SOCKET is subsequently pushed to rest of the graph + as as SOCKETMessage + """ + + def setup(self) -> None: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.bind((socket.gethostname(), 1234)) + self.socket.listen(5) + + def cleanup(self, clientsocket) -> None: + clientsocket.close() + + def socket_monitor(self) -> None: + while True: + clientsocket, address = self.socket.accept() + print(f"Connection from {address} has been established!") + # client socket is our local version of the client's socket, + # so we send information to the client + clientsocket.send(bytes("Welcome to the server!", "utf-8")) + self.cleanup(clientsocket) diff --git a/labgraph/devices/protocols/socket/tests/test_socket_poller.py b/labgraph/devices/protocols/socket/tests/test_socket_poller.py new file mode 100644 index 000000000..af3c6aa77 --- /dev/null +++ b/labgraph/devices/protocols/socket/tests/test_socket_poller.py @@ -0,0 +1,7 @@ +from socket_poller_node import SOCKETPollerNode + +# Intial test to verify if the poller_node works +mySocketPoller = SOCKETPollerNode() +mySocketPoller.setup() + +mySocketPoller.socket_monitor() diff --git a/labgraph/devices/protocols/socket/tests/test_socket_sender.py b/labgraph/devices/protocols/socket/tests/test_socket_sender.py new file mode 100644 index 000000000..918df35d3 --- /dev/null +++ b/labgraph/devices/protocols/socket/tests/test_socket_sender.py @@ -0,0 +1,14 @@ +import socket + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# connect, client connect +s.connect((socket.gethostname(), 1234)) + +# 1024 is our buffer, stream of data how big of a chunk of data we want to receive +full_msg = '' +while True: + msg = s.recv(8) + if(len(msg) <= 0): + break + full_msg += msg.decode("utf-8") +print(full_msg) From 3643ef9ebbfe13ad405a696c78816ce5ca1c0e2e Mon Sep 17 00:00:00 2001 From: Fatima Zahra Chriha Date: Wed, 23 Feb 2022 14:18:51 -0500 Subject: [PATCH 3/6] add poller_node and tests --- .../protocols/socket/socket_sender_node.py | 24 +++++++++++++++++++ .../socket/tests/test_socket_sender.py | 17 ++++--------- 2 files changed, 29 insertions(+), 12 deletions(-) create mode 100644 labgraph/devices/protocols/socket/socket_sender_node.py diff --git a/labgraph/devices/protocols/socket/socket_sender_node.py b/labgraph/devices/protocols/socket/socket_sender_node.py new file mode 100644 index 000000000..07ff7b51d --- /dev/null +++ b/labgraph/devices/protocols/socket/socket_sender_node.py @@ -0,0 +1,24 @@ +import socket + + +class SOCKETSenderNode(): + """ + Represents a node in a Labgraph graph that subscribes to messages in a + Labgraph topic and forwards them by writing to a SOCKET object. + """ + + def setup(self) -> None: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((socket.gethostname(), 1234)) + + def cleanup(self) -> None: + self.socket.close() + + def socket_monitor(self) -> None: + data = '' + while True: + msg = self.socket.recv(8) + if(len(msg) <= 0): + break + data += msg.decode("utf-8") + print(data) diff --git a/labgraph/devices/protocols/socket/tests/test_socket_sender.py b/labgraph/devices/protocols/socket/tests/test_socket_sender.py index 918df35d3..f70bd4fe2 100644 --- a/labgraph/devices/protocols/socket/tests/test_socket_sender.py +++ b/labgraph/devices/protocols/socket/tests/test_socket_sender.py @@ -1,14 +1,7 @@ -import socket +from socket_sender_node import SOCKETSenderNode -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -# connect, client connect -s.connect((socket.gethostname(), 1234)) +# Initial test to verify if the poller_node works +mySocketSender = SOCKETSenderNode() +mySocketSender.setup() -# 1024 is our buffer, stream of data how big of a chunk of data we want to receive -full_msg = '' -while True: - msg = s.recv(8) - if(len(msg) <= 0): - break - full_msg += msg.decode("utf-8") -print(full_msg) +mySocketSender.socket_monitor() From d36a1f8322f1c52fde5ac34430994d08eafb3b66 Mon Sep 17 00:00:00 2001 From: Fatima Zahra Chriha Date: Wed, 2 Mar 2022 15:26:05 -0500 Subject: [PATCH 4/6] more functionality --- .../protocols/socket/socket_poller_node.py | 41 ++++++------- .../protocols/socket/socket_sender_node.py | 57 ++++++++++++++----- 2 files changed, 66 insertions(+), 32 deletions(-) diff --git a/labgraph/devices/protocols/socket/socket_poller_node.py b/labgraph/devices/protocols/socket/socket_poller_node.py index 728f923f4..a22ccb591 100644 --- a/labgraph/devices/protocols/socket/socket_poller_node.py +++ b/labgraph/devices/protocols/socket/socket_poller_node.py @@ -1,32 +1,35 @@ -#!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserved. - import socket +from labgraph.graphs import Config, Node +from labgraph.graphs.method import background + +# client + -from socket_message import SOCKETMessage -from labgraph.graphs import Node +class SOCKETPollerConfig(Config): + read_addr: str + socket_topic: str -class SOCKETPollerNode(): +class SOCKETPollerNode(Node): """ - Represents a node in the graph which recieves data from SOCKET. - Data polled from SOCKET is subsequently pushed to rest of the graph - as as SOCKETMessage + Represents a node in a Labgraph graph that subscribes to messages in a + Labgraph topic and forwards them by writing to a SOCKET object. """ + config: SOCKETPollerConfig def setup(self) -> None: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.bind((socket.gethostname(), 1234)) - self.socket.listen(5) + self.socket.connect((socket.gethostname(), self.config.read_addr)) - def cleanup(self, clientsocket) -> None: - clientsocket.close() + def cleanup(self) -> None: + self.socket.close() + @background def socket_monitor(self) -> None: + data = '' while True: - clientsocket, address = self.socket.accept() - print(f"Connection from {address} has been established!") - # client socket is our local version of the client's socket, - # so we send information to the client - clientsocket.send(bytes("Welcome to the server!", "utf-8")) - self.cleanup(clientsocket) + msg = self.socket.recv(8) + if(len(msg) <= 0): + break + data += msg.decode("utf-8") + print(data) diff --git a/labgraph/devices/protocols/socket/socket_sender_node.py b/labgraph/devices/protocols/socket/socket_sender_node.py index 07ff7b51d..a6b9371bc 100644 --- a/labgraph/devices/protocols/socket/socket_sender_node.py +++ b/labgraph/devices/protocols/socket/socket_sender_node.py @@ -1,24 +1,55 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + import socket +from socket_message import SOCKETMessage +from labgraph.graphs import Config, Node, Topic, background +from labgraph.util.logger import get_logger + + +# server + +# lookup what is a logger +STARTUP_WAIT_TIME = 5 + +logger = get_logger(__name__) -class SOCKETSenderNode(): +class SOCKETSenderConfig(Config): + write_addr: str + # The message: in our case it was (Welcome to the Server) + socket_topic: str + + +class SOCKETSenderNode(Node): """ - Represents a node in a Labgraph graph that subscribes to messages in a - Labgraph topic and forwards them by writing to a SOCKET object. + Represents a node in the graph which recieves data from SOCKET. + Data polled from SOCKET is subsequently pushed to rest of the graph + as as SOCKETMessage + + Args: + read_addr: The address from which ZMQ data should be polled. + socket_topic: The SOCKET topic being sent. """ + topic = Topic(SOCKETMessage) + config: SOCKETSenderConfig + def setup(self) -> None: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((socket.gethostname(), 1234)) + logger.debug(f"{self}:binding to {self.config.write_addr}") + self.socket.bind((socket.gethostname(), self.config.write_addr)) + self.socket.listen(STARTUP_WAIT_TIME) - def cleanup(self) -> None: - self.socket.close() + def cleanup(self, clientsocket) -> None: + clientsocket.close() - def socket_monitor(self) -> None: - data = '' + @background + async def socket_monitor(self) -> None: while True: - msg = self.socket.recv(8) - if(len(msg) <= 0): - break - data += msg.decode("utf-8") - print(data) + clientsocket, address = self.socket.accept() + print(f"Connection from {address} has been established!") + # client socket is our local version of the client's socket, + # so we send information to the client + clientsocket.send(bytes(self.config.socket_topic, "utf-8")) + self.cleanup(clientsocket) From c4b5514562b40dfe0c02782a803eed2ad82ca504 Mon Sep 17 00:00:00 2001 From: Fatima Zahra Chriha Date: Fri, 4 Mar 2022 18:23:44 -0500 Subject: [PATCH 5/6] using pickle to read object --- .../protocols/socket/socket_poller_node.py | 18 ++++++++++++++---- .../protocols/socket/socket_sender_node.py | 14 +++++++++++--- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/labgraph/devices/protocols/socket/socket_poller_node.py b/labgraph/devices/protocols/socket/socket_poller_node.py index a22ccb591..2f53e6ee4 100644 --- a/labgraph/devices/protocols/socket/socket_poller_node.py +++ b/labgraph/devices/protocols/socket/socket_poller_node.py @@ -1,9 +1,14 @@ +from email.base64mime import header_length import socket +import pickle from labgraph.graphs import Config, Node from labgraph.graphs.method import background +from labgraph.util.logger import get_logger # client +logger = get_logger(__name__) + class SOCKETPollerConfig(Config): read_addr: str @@ -21,6 +26,8 @@ def setup(self) -> None: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((socket.gethostname(), self.config.read_addr)) + self.socket_open = False + def cleanup(self) -> None: self.socket.close() @@ -28,8 +35,11 @@ def cleanup(self) -> None: def socket_monitor(self) -> None: data = '' while True: - msg = self.socket.recv(8) - if(len(msg) <= 0): + # What's the size of the socket + msg = self.socket.recv(header_length) + if not len(msg): break - data += msg.decode("utf-8") - print(data) + data += msg + data = pickle.loads(data) + event = data["event"] + logger.debug(f"{self}:{event.name}") diff --git a/labgraph/devices/protocols/socket/socket_sender_node.py b/labgraph/devices/protocols/socket/socket_sender_node.py index a6b9371bc..246e168bb 100644 --- a/labgraph/devices/protocols/socket/socket_sender_node.py +++ b/labgraph/devices/protocols/socket/socket_sender_node.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 # Copyright 2004-present Facebook. All Rights Reserved. - +import pickle +import asyncio import socket +from labgraph.graphs.method import subscriber from socket_message import SOCKETMessage from labgraph.graphs import Config, Node, Topic, background from labgraph.util.logger import get_logger @@ -10,7 +12,7 @@ # server # lookup what is a logger -STARTUP_WAIT_TIME = 5 +STARTUP_WAIT_TIME = 0.1 logger = get_logger(__name__) @@ -40,6 +42,7 @@ def setup(self) -> None: logger.debug(f"{self}:binding to {self.config.write_addr}") self.socket.bind((socket.gethostname(), self.config.write_addr)) self.socket.listen(STARTUP_WAIT_TIME) + self.has_subscrivers = False def cleanup(self, clientsocket) -> None: clientsocket.close() @@ -48,8 +51,13 @@ def cleanup(self, clientsocket) -> None: async def socket_monitor(self) -> None: while True: clientsocket, address = self.socket.accept() - print(f"Connection from {address} has been established!") + logger.debug(f"Connection from {address} has been established!") # client socket is our local version of the client's socket, # so we send information to the client clientsocket.send(bytes(self.config.socket_topic, "utf-8")) self.cleanup(clientsocket) + + @subscriber(topic) + async def socket_subscriber(self) -> None: + while not self.has_subscrivers: + await asyncio.sleep(STARTUP_WAIT_TIME) From f4c1d714f8086f48ab3565ab3ec281d1414c8cae Mon Sep 17 00:00:00 2001 From: Fatima Zahra Chriha Date: Wed, 30 Mar 2022 13:39:00 -0400 Subject: [PATCH 6/6] Added client, sever, README files --- labgraph/devices/protocols/socket/.DS_Store | Bin 0 -> 6148 bytes labgraph/devices/protocols/socket/README.md | 13 +++++++++++++ labgraph/devices/protocols/socket/client.py | 14 ++++++++++++++ labgraph/devices/protocols/socket/server.py | 16 ++++++++++++++++ .../devices/protocols/socket/tests/README.md | 1 + .../devices/protocols/socket/tests/__init__.py | 0 6 files changed, 44 insertions(+) create mode 100644 labgraph/devices/protocols/socket/.DS_Store create mode 100644 labgraph/devices/protocols/socket/README.md create mode 100644 labgraph/devices/protocols/socket/client.py create mode 100644 labgraph/devices/protocols/socket/server.py create mode 100644 labgraph/devices/protocols/socket/tests/README.md create mode 100644 labgraph/devices/protocols/socket/tests/__init__.py diff --git a/labgraph/devices/protocols/socket/.DS_Store b/labgraph/devices/protocols/socket/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..ac3a5bc206d4cd63aa7c96f49514a06259d1a7a3 GIT binary patch literal 6148 zcmeHKL2uJA6n<_ymQo=ViAiukio~@VErm%Fm(Yy^SAyUGC?rd@Ma$x-Nmor(rQYGc zaOQ`=f8hk*vmI$oJ59SF1b)%;=h*Lij=v~&OhjTdjR!gHdqC$0)LwV{OxwsH=9sKm%rbOSE)RTQW+tFmxmYd-CxR3t5`W* z&=H036B=`#8tZF~)kb8BamE;}RO>`-9Px0CQ6PFk=c?qDrOx3sQTF>kM5Ec-y0z_WJ00hvesH%t2+hjZo zy7vxbR)$G7o+v^Rj}h|WU6RFeHj>jUE)>_-9ZuV62i*tr`IEhV&mBG+EPC#I(BB{S z+^566#iH%(JbL{6b#R)VWs+YXOkfL9cE?}^Ur@O;`E!4gr7~loD@GM*@kR<5`JB#; zST@)yU={cu6yWv2g)@2vmm1a5fkGVtfGspDL!EyXnByAs3@$aI2PQNXsHwsnF@&b0 z-8FHZ!KFq`Ct(gB!i+4;2}P*Uk>6F}Bs`6_vW}jY#g`Ls3<7RcC0GA72k&|L!ZkPpl5KY5iKzLBcNok Kg;n5>D)1WsK*Lr5 literal 0 HcmV?d00001 diff --git a/labgraph/devices/protocols/socket/README.md b/labgraph/devices/protocols/socket/README.md new file mode 100644 index 000000000..39f9c70b7 --- /dev/null +++ b/labgraph/devices/protocols/socket/README.md @@ -0,0 +1,13 @@ +# How to run the tests + +from root dir labraph run + +`python3 -m labgraph.devices.protocols.socket.tests.test_socket_sender.py` + +# Sender Node + +server file + +# Poller Node + +client file diff --git a/labgraph/devices/protocols/socket/client.py b/labgraph/devices/protocols/socket/client.py new file mode 100644 index 000000000..918df35d3 --- /dev/null +++ b/labgraph/devices/protocols/socket/client.py @@ -0,0 +1,14 @@ +import socket + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# connect, client connect +s.connect((socket.gethostname(), 1234)) + +# 1024 is our buffer, stream of data how big of a chunk of data we want to receive +full_msg = '' +while True: + msg = s.recv(8) + if(len(msg) <= 0): + break + full_msg += msg.decode("utf-8") +print(full_msg) diff --git a/labgraph/devices/protocols/socket/server.py b/labgraph/devices/protocols/socket/server.py new file mode 100644 index 000000000..837e71b62 --- /dev/null +++ b/labgraph/devices/protocols/socket/server.py @@ -0,0 +1,16 @@ +from http import client +import socket + +# AF: Adress family, IPv4?? +# s: socket object +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.bind((socket.gethostname(), 1234)) # Tie +s.listen(5) + +while True: + clientsocket, address = s.accept() + print(f"Connection from {address} has been established!") + # client socket is our local version of the client's socket, + # so we send information to the client + clientsocket.send(bytes("Welcome to the server!", "utf-8")) + clientsocket.close() diff --git a/labgraph/devices/protocols/socket/tests/README.md b/labgraph/devices/protocols/socket/tests/README.md new file mode 100644 index 000000000..38f0ae94c --- /dev/null +++ b/labgraph/devices/protocols/socket/tests/README.md @@ -0,0 +1 @@ +from root dir labraph run python3 -m labgraph.devices.protocols.socket.tests.test_socket_sender.py diff --git a/labgraph/devices/protocols/socket/tests/__init__.py b/labgraph/devices/protocols/socket/tests/__init__.py new file mode 100644 index 000000000..e69de29bb