diff --git a/python/client.py b/python/client.py index 97bc8af1..66cf349c 100644 --- a/python/client.py +++ b/python/client.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Optional import os import json @@ -87,33 +88,54 @@ def block(self, delay=0.005, base=2, max_attempts=16): raise StopIteration return meta - def listen(self): - """ - Listens to server side events on the status of a request. + def listen(self, timeout: Optional[float] = 300): """ + Listens to server-side events on the status of a request. + Yields: + dict[str, Any]: Parsed JSON messages from the event stream. + """ url = self.server.base + f"/status_stream/{quote(self.status_loc)}" + with EventSource(url, timeout=timeout) as event_source: + for event in event_source: + if event.data: + msg = json.loads(event.data) + yield msg + + print("event stream closed") - def on_error(): - raise Exception("error") + def listen_until_clear(self, timeout: Optional[float] = 300): + """ + Listens to server side events until a 'pathClear' status is received. - with EventSource( - url, - timeout=30, - on_error=on_error, - ) as event_source: - try: - print("listening...") - for event in event_source: - if event.data: - msg = json.loads(event.data) - print("msg: ", msg) - if msg['status'] == "pathClear": - return + Args: + timeout (float): The maximum amount of time to listen. - except Exception as e: - print("error: ", e) + Returns: + metadata: Any + Raises: + TimeoutError: If the timeout is reached before the path is clear. + """ + start_time = monotonic() + + try: + for msg in self.listen(timeout=timeout): + if timeout and (monotonic() - start_time) > timeout: + raise TimeoutError(f"listen_until_clear timed out after {timeout} seconds") + + status = msg.get("status") + if status == "pathClear": + return "" + elif status == "pathForbiddenTemporary": + continue + else: + return msg + except requests.exceptions.ReadTimeout: # timeout raised by requests module + raise TimeoutError(f"Timeout of {timeout}s waiting for event from server.") from None + except Exception as e: # unkown exceptions + print(f"An unexpected error occurred: {e}") + raise def __str__(self): @@ -433,7 +455,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): if "time" in self.finalization: print(f"{self.ns.format("*")} time {monotonic() - self.t0:.6f} s") - if "clear" in self.finalization: self.clear().listen() + if "clear" in self.finalization: self.clear().listen_until_clear() if "spin_down" in self.finalization: self.spin_down() if "stop" in self.finalization: self.stop() @@ -595,7 +617,7 @@ def _main(): print("data", ins.download_().data) - ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen() + ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen_until_clear() print("data", ins.download_().data) @@ -607,23 +629,31 @@ def _main_mm2(): # smoke test with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: server.upload_("(data (foo 1))\n(data (foo 2))\n(_exec 0 (, (data (foo $x))) (, (data (bar $x))))") - server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen() - server.exec(thread_id="test").listen() - print("data", server.download_().data) - - for i, item in enumerate(server.history): - print(i, str(item)) - -def test_sse_status(): + server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen_until_clear(5) + server.exec(thread_id="test").listen_until_clear(5) + # print("data", server.download_().data) + # + # for i, item in enumerate(server.history): + # print(i, str(item)) + +def _test_sse_status(): with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: - server.sexpr_import_(f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen() + DATASETS = ( + "royal92", + "lordOfTheRings", + "adameve", + "simpsons", + ) + for dataset in DATASETS: + server.sexpr_import_( + f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/{dataset}.metta" + ).listen_until_clear() if __name__ == '__main__': # _main() _main_mm2() - # test_sse_status()