Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions python/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Optional
import os
import json
Expand Down Expand Up @@ -87,33 +88,54 @@ def block(self, delay=0.005, base=2, max_attempts=16):
raise StopIteration
return meta

def listen(self):
"""
Listens to server side events on the status of a request.
def listen(self, timeout: Optional[float] = 300):
"""
Listens to server-side events on the status of a request.

Yields:
dict[str, Any]: Parsed JSON messages from the event stream.
"""
url = self.server.base + f"/status_stream/{quote(self.status_loc)}"
with EventSource(url, timeout=timeout) as event_source:
for event in event_source:
if event.data:
msg = json.loads(event.data)
yield msg

print("event stream closed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to be this chatty, by default. It's handy for debugging, and maybe if we support a "verbose" switch. But otherwise we probably want operations at this level to be silent on the client side. Just my opinion.


def on_error():
raise Exception("error")
def listen_until_clear(self, timeout: Optional[float] = 300):
"""
Listens to server side events until a 'pathClear' status is received.

with EventSource(
url,
timeout=30,
on_error=on_error,
) as event_source:
try:
print("listening...")
for event in event_source:
if event.data:
msg = json.loads(event.data)
print("msg: ", msg)
if msg['status'] == "pathClear":
return
Args:
timeout (float): The maximum amount of time to listen.

except Exception as e:
print("error: ", e)
Returns:
metadata: Any

Raises:
TimeoutError: If the timeout is reached before the path is clear.
"""
start_time = monotonic()

try:
for msg in self.listen(timeout=timeout):
if timeout and (monotonic() - start_time) > timeout:
raise TimeoutError(f"listen_until_clear timed out after {timeout} seconds")

status = msg.get("status")
if status == "pathClear":
return ""
elif status == "pathForbiddenTemporary":
continue
else:
return msg
except requests.exceptions.ReadTimeout: # timeout raised by requests module
raise TimeoutError(f"Timeout of {timeout}s waiting for event from server.") from None
except Exception as e: # unkown exceptions
print(f"An unexpected error occurred: {e}")
raise


def __str__(self):
Expand Down Expand Up @@ -433,7 +455,7 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
if "time" in self.finalization: print(f"{self.ns.format("*")} time {monotonic() - self.t0:.6f} s")
if "clear" in self.finalization: self.clear().listen()
if "clear" in self.finalization: self.clear().listen_until_clear()
if "spin_down" in self.finalization: self.spin_down()
if "stop" in self.finalization: self.stop()

Expand Down Expand Up @@ -595,7 +617,7 @@ def _main():

print("data", ins.download_().data)

ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen()
ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen_until_clear()

print("data", ins.download_().data)

Expand All @@ -607,23 +629,31 @@ def _main_mm2():
# smoke test
with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server:
server.upload_("(data (foo 1))\n(data (foo 2))\n(_exec 0 (, (data (foo $x))) (, (data (bar $x))))")
server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen()
server.exec(thread_id="test").listen()
print("data", server.download_().data)

for i, item in enumerate(server.history):
print(i, str(item))

def test_sse_status():
server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen_until_clear(5)
server.exec(thread_id="test").listen_until_clear(5)
# print("data", server.download_().data)
#
# for i, item in enumerate(server.history):
# print(i, str(item))

def _test_sse_status():
with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server:
server.sexpr_import_(f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen()
DATASETS = (
"royal92",
"lordOfTheRings",
"adameve",
"simpsons",
)
for dataset in DATASETS:
server.sexpr_import_(
f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/{dataset}.metta"
).listen_until_clear()



if __name__ == '__main__':
# _main()
_main_mm2()
# test_sse_status()



Expand Down