From 3d69daf8707f21fb3de549b9207670459de63e2d Mon Sep 17 00:00:00 2001 From: surafel_fikru Date: Thu, 10 Jul 2025 10:14:45 +0300 Subject: [PATCH 1/4] implement listen_until_clear - changed listen method to a generic generator the yields the status sent by the server stream. - implemented listen_until_clear that simulates what block() does but for server streams insteam of polling. --- python/client.py | 52 ++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/python/client.py b/python/client.py index 97bc8af1..166c7704 100644 --- a/python/client.py +++ b/python/client.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Optional import os import json @@ -90,30 +91,37 @@ def block(self, delay=0.005, base=2, max_attempts=16): def listen(self): """ Listens to server side events on the status of a request. - """ + Yields: + dict[str, str] + """ url = self.server.base + f"/status_stream/{quote(self.status_loc)}" - - def on_error(): - raise Exception("error") - - with EventSource( - url, - timeout=30, - on_error=on_error, - ) as event_source: - try: - print("listening...") + try: + with EventSource(url, timeout=30) as event_source: for event in event_source: if event.data: msg = json.loads(event.data) - print("msg: ", msg) - if msg['status'] == "pathClear": - return + yield msg + except Exception as e: + print(f"An unexpected error occurred: {e}") - except Exception as e: - print("error: ", e) + def listen_until_clear(self): + """ + Listens to server side events until a 'pathClear' status is received. + Returns: + metadata: Any + """ + print("Listening status ...") + for msg in self.listen(): + status = msg.get("status") + if status == "pathClear": + print("path clear") + return "" + elif status == "pathForbiddenTemporary": + continue + else: + return msg def __str__(self): @@ -433,7 +441,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): if "time" in self.finalization: print(f"{self.ns.format("*")} time {monotonic() - self.t0:.6f} s") - if "clear" in self.finalization: self.clear().listen() + if "clear" in self.finalization: self.clear().listen_until_clear() if "spin_down" in self.finalization: self.spin_down() if "stop" in self.finalization: self.stop() @@ -595,7 +603,7 @@ def _main(): print("data", ins.download_().data) - ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen() + ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen_until_clear() print("data", ins.download_().data) @@ -607,8 +615,8 @@ def _main_mm2(): # smoke test with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: server.upload_("(data (foo 1))\n(data (foo 2))\n(_exec 0 (, (data (foo $x))) (, (data (bar $x))))") - server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen() - server.exec(thread_id="test").listen() + server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen_until_clear() + server.exec(thread_id="test").listen_until_clear() print("data", server.download_().data) for i, item in enumerate(server.history): @@ -616,7 +624,7 @@ def _main_mm2(): def test_sse_status(): with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: - server.sexpr_import_(f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen() + server.sexpr_import_(f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen_until_clear() From 9d86a8a0602709c4b7e4bfc27152c3b080dbb78f Mon Sep 17 00:00:00 2001 From: surafel_fikru Date: Thu, 10 Jul 2025 11:30:36 +0300 Subject: [PATCH 2/4] add timeouts for listen and listen_until_clear --- python/client.py | 87 +++++++++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/python/client.py b/python/client.py index 166c7704..6fd9d2eb 100644 --- a/python/client.py +++ b/python/client.py @@ -88,40 +88,58 @@ def block(self, delay=0.005, base=2, max_attempts=16): raise StopIteration return meta - def listen(self): + def listen(self, timeout: float = 300): """ - Listens to server side events on the status of a request. + Listens to server-side events on the status of a request. Yields: - dict[str, str] + dict[str, Any]: Parsed JSON messages from the event stream. """ url = self.server.base + f"/status_stream/{quote(self.status_loc)}" - try: - with EventSource(url, timeout=30) as event_source: - for event in event_source: - if event.data: - msg = json.loads(event.data) - yield msg - except Exception as e: - print(f"An unexpected error occurred: {e}") + with EventSource(url, timeout=timeout) as event_source: + for event in event_source: + if event.data: + msg = json.loads(event.data) + yield msg + + print("event stream closed") - def listen_until_clear(self): + def listen_until_clear(self, timeout: float = 300): """ Listens to server side events until a 'pathClear' status is received. + Args: + timeout (int): The maximum amount of time to listen. + Returns: metadata: Any + + Raises: + TimeoutError: If the timeout is reached before the path is clear. """ print("Listening status ...") - for msg in self.listen(): - status = msg.get("status") - if status == "pathClear": - print("path clear") - return "" - elif status == "pathForbiddenTemporary": - continue - else: - return msg + start_time = monotonic() + + try: + for msg in self.listen(timeout=timeout): + if timeout is not None: + if (monotonic() - start_time) > timeout: + raise TimeoutError(f"listen_until_clear timed out after {timeout} seconds") + + status = msg.get("status") + if status == "pathClear": + print("path cleared") + return "" + elif status == "pathForbiddenTemporary": + print("waiting for path to clear") + continue + else: + return msg + except requests.exceptions.ReadTimeout: # timeout raised by requests module + raise TimeoutError(f"Timeout of {timeout}s waiting for event from server.") from None + except Exception as e: # unkown exceptions + print(f"An unexpected error occurred: {e}") + raise def __str__(self): @@ -615,23 +633,32 @@ def _main_mm2(): # smoke test with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: server.upload_("(data (foo 1))\n(data (foo 2))\n(_exec 0 (, (data (foo $x))) (, (data (bar $x))))") - server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen_until_clear() - server.exec(thread_id="test").listen_until_clear() - print("data", server.download_().data) - - for i, item in enumerate(server.history): - print(i, str(item)) + server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen_until_clear(5) + server.exec(thread_id="test").listen_until_clear(5) + # print("data", server.download_().data) + # + # for i, item in enumerate(server.history): + # print(i, str(item)) def test_sse_status(): with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: - server.sexpr_import_(f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen_until_clear() + DATASETS = ( + "royal92", + "lordOfTheRings", + "adameve", + "simpsons", + ) + for dataset in DATASETS: + server.sexpr_import_( + f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/{dataset}.metta" + ).listen_until_clear() if __name__ == '__main__': # _main() - _main_mm2() - # test_sse_status() + # _main_mm2() + test_sse_status() From cf640e14723082fcd8880df2df2e29dd247df327 Mon Sep 17 00:00:00 2001 From: surafel_fikru Date: Thu, 10 Jul 2025 11:36:46 +0300 Subject: [PATCH 3/4] make listen functions accept infinit waits --- python/client.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/client.py b/python/client.py index 6fd9d2eb..b979cf00 100644 --- a/python/client.py +++ b/python/client.py @@ -88,7 +88,7 @@ def block(self, delay=0.005, base=2, max_attempts=16): raise StopIteration return meta - def listen(self, timeout: float = 300): + def listen(self, timeout: Optional[float] = 300): """ Listens to server-side events on the status of a request. @@ -104,12 +104,12 @@ def listen(self, timeout: float = 300): print("event stream closed") - def listen_until_clear(self, timeout: float = 300): + def listen_until_clear(self, timeout: Optional[float] = 300): """ Listens to server side events until a 'pathClear' status is received. Args: - timeout (int): The maximum amount of time to listen. + timeout (float): The maximum amount of time to listen. Returns: metadata: Any @@ -122,9 +122,8 @@ def listen_until_clear(self, timeout: float = 300): try: for msg in self.listen(timeout=timeout): - if timeout is not None: - if (monotonic() - start_time) > timeout: - raise TimeoutError(f"listen_until_clear timed out after {timeout} seconds") + if timeout and (monotonic() - start_time) > timeout: + raise TimeoutError(f"listen_until_clear timed out after {timeout} seconds") status = msg.get("status") if status == "pathClear": From de45573920f3ed5622b0708c12d966ecb0fe3ca2 Mon Sep 17 00:00:00 2001 From: surafel_fikru Date: Tue, 15 Jul 2025 23:37:17 +0300 Subject: [PATCH 4/4] clean up unecessary logs and add back mm2 tests --- python/client.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python/client.py b/python/client.py index b979cf00..66cf349c 100644 --- a/python/client.py +++ b/python/client.py @@ -117,7 +117,6 @@ def listen_until_clear(self, timeout: Optional[float] = 300): Raises: TimeoutError: If the timeout is reached before the path is clear. """ - print("Listening status ...") start_time = monotonic() try: @@ -127,10 +126,8 @@ def listen_until_clear(self, timeout: Optional[float] = 300): status = msg.get("status") if status == "pathClear": - print("path cleared") return "" elif status == "pathForbiddenTemporary": - print("waiting for path to clear") continue else: return msg @@ -639,7 +636,7 @@ def _main_mm2(): # for i, item in enumerate(server.history): # print(i, str(item)) -def test_sse_status(): +def _test_sse_status(): with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: DATASETS = ( "royal92", @@ -656,8 +653,7 @@ def test_sse_status(): if __name__ == '__main__': # _main() - # _main_mm2() - test_sse_status() + _main_mm2()