From 641d4472a1daaf03fbb2eb217e94d39c36aabaa0 Mon Sep 17 00:00:00 2001 From: Francois Retief Date: Thu, 16 May 2024 08:45:48 +0200 Subject: [PATCH] Add ActivitiesClient to work with activities --- yamcs-client/src/yamcs/activities/__init__.py | 0 yamcs-client/src/yamcs/activities/client.py | 365 ++++++++++++++++++ yamcs-client/src/yamcs/activities/model.py | 169 ++++++++ yamcs-client/src/yamcs/client/__init__.py | 2 + yamcs-client/src/yamcs/client/activities.py | 48 ++- yamcs-client/src/yamcs/client/core.py | 10 + 6 files changed, 587 insertions(+), 7 deletions(-) create mode 100644 yamcs-client/src/yamcs/activities/__init__.py create mode 100644 yamcs-client/src/yamcs/activities/client.py create mode 100644 yamcs-client/src/yamcs/activities/model.py diff --git a/yamcs-client/src/yamcs/activities/__init__.py b/yamcs-client/src/yamcs/activities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/yamcs-client/src/yamcs/activities/client.py b/yamcs-client/src/yamcs/activities/client.py new file mode 100644 index 0000000..b432ad7 --- /dev/null +++ b/yamcs-client/src/yamcs/activities/client.py @@ -0,0 +1,365 @@ +import functools +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union +from datetime import datetime +from yamcs.core import pagination +from yamcs.core.context import Context +from yamcs.core.futures import WebSocketSubscriptionFuture +from yamcs.core.helpers import to_isostring +from yamcs.core.subscriptions import WebSocketSubscriptionManager +from yamcs.activities.model import ActivityInfo, ActivityLogInfo, ExecutorInfo +from yamcs.client.activities import ManualActivity, ScriptActivity, CommandActivity, CommandStackActivity +from yamcs.protobuf.activities import activities_pb2, activities_service_pb2 + +class GlobalActivityStatusSubscription(WebSocketSubscriptionFuture): + """ + Local object providing access to global ongoing activities status updates. + + A subscription object stores the number of ongoing activities. + """ + def __init__(self, manager): + super(GlobalActivityStatusSubscription, self).__init__(manager) + + self.ongoingCount: Optional[int] = None + """The number of ongoing activities.""" + + def _process(self, ongoingCount): + self.ongoingCount = ongoingCount + +def _wrap_callback_parse_global_activity_status(subscription, on_data, message): + """ + Wraps a user callback to parse GlobalActivityStatus + from a WebSocket data message + """ + pb = activities_service_pb2.GlobalActivityStatus() + message.Unpack(pb) + ongoingCount = pb.ongoingCount + subscription._process(ongoingCount) + if on_data: + on_data(ongoingCount) + +def _wrap_callback_parse_activity(on_data, message): + """ + Wraps a user callback to parse ActivityInfo + from a WebSocket data message + """ + pb = activities_pb2.ActivityInfo() + message.Unpack(pb) + on_data(ActivityInfo(pb)) + +def _wrap_callback_parse_activity_log(on_data, message): + """ + Wraps a user callback to parse ActivityLogInfo + from a WebSocket data message + """ + pb = activities_pb2.ActivityLogInfo() + message.Unpack(pb) + on_data(ActivityLogInfo(pb)) + +class ActivitiesClient: + def __init__(self, ctx: Context, instance: str): + super(ActivitiesClient, self).__init__() + self.ctx = ctx + self._instance = instance + + def list_activities(self, + type: Optional[str] = None, + status: Optional[str] = None, + text_filter: Optional[str] = None, + start: Optional[datetime] = None, + stop: Optional[datetime] = None, + page_size: int = 100, + descending: bool = False) -> Iterable[ActivityInfo]: + """ + Reads activities between the specified start and stop time. + + .. note:: + + This method will send out multiple requests when more than + ``page_size`` activities are queried. For large queries, consider + using :meth:`stream_activities` instead, it uses server-streaming + based on a single request. + + :param type: + The type of the returned activity. + :param status: + Filter on activity status. + One of ``RUNNING``, ``CANCELLED``, ``SUCCESSFUL``, ``FAILED``. + :param text_filter: + Filter the description of the returned activities + :param start: + Minimum start date of the returned activities (inclusive) + :param stop: + Maximum start date of the returned activities (exclusive) + :param page_size: + Page size of underlying requests. Higher values imply + less overhead, but risk hitting the maximum message size + limit. + :param descending: + If set to ``True`` events are fetched in reverse + order (most recent first). + """ + + params: Dict[str, Any] = { + "order": "desc" if descending else "desc", + } + + if type is not None: + params["type"] = type + if status is not None: + params["status"] = status + if page_size is not None: + params["limit"] = page_size + if start is not None: + params["start"] = to_isostring(start) + if stop is not None: + params["stop"] = to_isostring(stop) + if text_filter is not None: + params["q"] = text_filter + + return pagination.Iterator( + ctx=self.ctx, + path=f"/activities/{self._instance}/activities", + params=params, + response_class=activities_service_pb2.ListActivitiesResponse, + items_key="activities", + item_mapper=ActivityInfo, + ) + + def list_executors(self) -> List[ExecutorInfo]: + """ + List available executors + """ + url = f"/activities/{self._instance}/executors" + response = self.ctx.get_proto(url) + message = activities_service_pb2.ListExecutorsResponse() + message.ParseFromString(response.content) + return [ExecutorInfo(executor) for executor in message.executors] + + def list_scripts(self) -> List[str]: + """ + List scripts available for activities of type SCRIPT + """ + url = f"/activities/{self._instance}/scripts" + response = self.ctx.get_proto(url) + message = activities_service_pb2.ListScriptsResponse() + message.ParseFromString(response.content) + return [script for script in message.scripts] + + def get_activity_log(self, activity: str) -> List[ActivityLogInfo]: + """ + Get the activity log + """ + url = f"/activities/{self._instance}/activities/{activity}/log" + response = self.ctx.get_proto(url) + message = activities_service_pb2.GetActivityLogResponse() + message.ParseFromString(response.content) + return [ActivityLogInfo(log) for log in message.logs] + + def get_activity(self, activity: str) -> ActivityInfo: + """ + Get an activity + """ + url = f"/activities/{self._instance}/activities/{activity}" + response = self.ctx.get_proto(url) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def cancel_activity(self, activity: str): + """ + Cancel an ongoing activity + """ + url = f"/activities/{self._instance}/activities/{activity}:cancel" + response = self.ctx.post_proto(url) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def complete_manual_activity(self, activity : str, failureReason : Optional[str] = None): + """ + Mark an ongoing activity as completed. + + This method may only be used with manual activities. + """ + req = activities_service_pb2.CompleteManualActivityRequest() + req.instance = self._instance + req.activity = activity + if failureReason: + req.failureReason = failureReason + + url = f"/activities/{self._instance}/activities/{activity}:complete" + response = self.ctx.post_proto(url, data=req.SerializeToString()) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def start_manual_activity(self, name: str, comment: Optional[str] = None): + """ + Start a manual activity + """ + activity = ManualActivity(name, comment=comment) + req = activity._to_proto() + + url = f"/activities/{self._instance}/activities" + response = self.ctx.post_proto(url, data=req.SerializeToString()) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def start_script_activity(self, script: str, args: Optional[Union[str, List[str]]] = None, processor: Optional[str] = None, comment: Optional[str] = None): + """ + Start a script activity + """ + activity = ScriptActivity(script, args, processor=processor, comment=comment) + req = activity._to_proto() + + url = f"/activities/{self._instance}/activities" + response = self.ctx.post_proto(url, data=req.SerializeToString()) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def start_command_activity(self, command: str, args: Optional[dict] = None, extra: Optional[dict] = None, processor: Optional[str] = None, comment: Optional[str] = None): + """ + Start a command activity + """ + activity = CommandActivity(command, extra=extra, processor=processor, comment=comment) + if args: + activity.args = args + if extra: + activity.extra = extra + req = activity._to_proto() + + url = f"/activities/{self._instance}/activities" + response = self.ctx.post_proto(url, data=req.SerializeToString()) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def start_command_stack_activity(self, bucket: str, stack: str, processor: Optional[str] = None, comment: Optional[str] = None): + """ + Start a command activity + """ + activity = CommandStackActivity(bucket, stack, processor=processor, comment=comment) + req = activity._to_proto() + + url = f"/activities/{self._instance}/activities" + response = self.ctx.post_proto(url, data=req.SerializeToString()) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + # This function is for 'CUSTOM' activities; for SCRIPT, COMMAND, COMMAND_STACK, MANUAL see functions above + def start_activity(self, type: str, args: Optional[Mapping[str, Any]] = None, comment: Optional[str] = None): + """ + Start an activity + """ + req = activities_pb2.ActivityDefinitionInfo() + req.type = type + if args: + for key, value in args.items(): + req.args[key] = value + if comment: + req.comment = comment + + url = f"/activities/{self._instance}/activities" + response = self.ctx.post_proto(url, data=req.SerializeToString()) + message = activities_pb2.ActivityInfo() + message.ParseFromString(response.content) + return ActivityInfo(message) + + def create_global_status_subscription(self, + on_data: Optional[Callable[[int], None]] = None, + timeout: float = 60, + ) -> GlobalActivityStatusSubscription: + """ + Create a new subscription for receiving global status updates on the number of ongoing activites. + + This method returns a future, then returns immediately. Stop the + subscription by canceling the future. + + :param on_data: + Function that gets called with :class:`.int` updates. + :param timeout: + The amount of seconds to wait for the request to complete. + + :return: + A Future that can be used to manage the background websocket subscription. + """ + options = activities_service_pb2.SubscribeGlobalStatusRequest() + options.instance = self._instance + manager = WebSocketSubscriptionManager(self.ctx, topic="global-activity-status", options=options) + + # Represent subscription as a future + subscription = GlobalActivityStatusSubscription(manager) + + wrapped_callback = functools.partial(_wrap_callback_parse_global_activity_status, subscription, on_data) + + manager.open(wrapped_callback) + + # Wait until a reply or exception is received + subscription.reply(timeout=timeout) + + return subscription + + def create_activities_subscription(self, + on_data: Callable[[ActivityInfo], None], + timeout: float = 60, + ) -> WebSocketSubscriptionFuture: + """ + :param on_data: + Function that gets called with :class:`.ActivityInfo` updates. + :param timeout: + The amount of seconds to wait for the request to complete. + + :return: + A Future that can be used to manage the background websocket subscription. + """ + options = activities_service_pb2.SubscribeActivitiesRequest() + options.instance = self._instance + manager = WebSocketSubscriptionManager(self.ctx, topic="activities", options=options) + + # Represent subscription as a future + subscription = WebSocketSubscriptionFuture(manager) + + wrapped_callback = functools.partial(_wrap_callback_parse_activity, on_data) + + manager.open(wrapped_callback) + + # Wait until a reply or exception is received + subscription.reply(timeout=timeout) + + return subscription + + def create_activity_log_subscription(self, + activity: str, + on_data: Callable[[ActivityInfo], None], + timeout: float = 60, + ) -> WebSocketSubscriptionFuture: + """ + :param activity: + Activity identifier. + :param on_data: + Function that gets called with :class:`.ActivityInfo` updates. + :param timeout: + The amount of seconds to wait for the request to complete. + + :return: + A Future that can be used to manage the background websocket subscription. + """ + options = activities_service_pb2.SubscribeActivityLogRequest() + options.instance = self._instance + options.activity = activity + manager = WebSocketSubscriptionManager(self.ctx, topic="activity-log", options=options) + + # Represent subscription as a future + subscription = WebSocketSubscriptionFuture(manager) + + wrapped_callback = functools.partial(_wrap_callback_parse_activity_log, on_data) + + manager.open(wrapped_callback) + + # Wait until a reply or exception is received + subscription.reply(timeout=timeout) + + return subscription diff --git a/yamcs-client/src/yamcs/activities/model.py b/yamcs-client/src/yamcs/activities/model.py new file mode 100644 index 0000000..6ebeacf --- /dev/null +++ b/yamcs-client/src/yamcs/activities/model.py @@ -0,0 +1,169 @@ +from datetime import datetime +from typing import Optional +from yamcs.core.helpers import parse_server_time +from yamcs.protobuf.activities import activities_pb2 + +class ActivityInfo: + """ + Activity Info + """ + def __init__(self, proto : activities_pb2.ActivityInfo): + self._proto = proto + + @property + def id(self) -> str: + """ + Activity identifier + """ + return self._proto.id + + @property + def start(self) -> datetime: + """ + Start time of the activity + """ + return parse_server_time(self._proto.start) + + @property + def seq(self) -> int: + """ + Differentiator in case of multiple activities with the same start time + """ + return self._proto.seq + + @property + def status(self) -> str: + """ + Activity status + """ + return activities_pb2.ActivityStatus.Name(self._proto.status) + + @property + def startedBy(self) -> str: + """ + User who started the run + """ + return self._proto.startedBy + + @property + def type(self) -> str: + """ + Activity type + """ + return self._proto.type + + @property + def detail(self) -> str: + """ + Activity detail (short descriptive) + """ + return self._proto.detail + + @property + def stopped(self) -> Optional[datetime]: + """ + Stop time of the activity run + """ + if self._proto.HasField("stopped"): + return parse_server_time(self._proto.stopped) + return None + + @property + def stoppedBy(self) -> Optional[str]: + """ + User who stopped the run. + Only set if the activity was manually stopped + """ + if self._proto.HasField("stoppedBy"): + return self._proto.stoppedBy + return None + + @property + def failureReason(self) -> Optional[str]: + """ + If set, the activity is stopped, but failed + """ + if self._proto.HasField("failureReason"): + return self._proto.failureReason + return None + + def __str__(self): + return f"[{self.id}] {self.start} {self.type}" + +class ActivityLogInfo: + """ + Activity Log Info + """ + def __init__(self, proto : activities_pb2.ActivityLogInfo): + self._proto = proto + + @property + def time(self) -> datetime: + """ + Log time + """ + return parse_server_time(self._proto.time) + + @property + def source(self) -> str: + """ + Source of this log message. One of: + - SERVICE: the log is generated by the activity service + - ACTIVITY: the log is generated by the activity itself + """ + return self._proto.source + + @property + def level(self) -> str: + """ + Log level + """ + return activities_pb2.ActivityLogLevel.Name(self._proto.level) + + @property + def message(self) -> str: + """ + Log level + """ + return self._proto.message + + def __str__(self): + return f"{self.time:%Y-%m-%d %H:%M:%S} {self.source:8} {self.level:10} {self.message}" + +class ExecutorInfo: + """ + Executor Info + """ + def __init__(self, proto : activities_pb2.ExecutorInfo): + self._proto = proto + + @property + def type(self) -> str: + """ + Executor type + """ + return self._proto.type + + @property + def displayName(self) -> str: + """ + Executor display name + """ + return self._proto.displayName + + @property + def description(self) -> str: + """ + Executor description + """ + return self._proto.description + + @property + def icon(self) -> str: + """ + Name of an icon in the Material Icons font. + """ + return self._proto.icon + + def __str__(self): + return f"{self.type:13} {self.displayName:13} {self.description}" diff --git a/yamcs-client/src/yamcs/client/__init__.py b/yamcs-client/src/yamcs/client/__init__.py index 5d3bc52..9cd887a 100644 --- a/yamcs-client/src/yamcs/client/__init__.py +++ b/yamcs-client/src/yamcs/client/__init__.py @@ -1,4 +1,6 @@ from yamcs import clientversion # noqa +from yamcs.activities.client import * # noqa +from yamcs.activities.model import * # noqa from yamcs.archive.client import * # noqa from yamcs.archive.model import * # noqa from yamcs.client.activities import * # noqa diff --git a/yamcs-client/src/yamcs/client/activities.py b/yamcs-client/src/yamcs/client/activities.py index 371cd90..091f2eb 100644 --- a/yamcs-client/src/yamcs/client/activities.py +++ b/yamcs-client/src/yamcs/client/activities.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import KW_ONLY, dataclass, field from typing import Any, List, Mapping, Optional, Union from yamcs.core.helpers import to_argument_value @@ -16,6 +16,23 @@ class Activity: * :class:`.ScriptActivity` """ + _: KW_ONLY + comment: Optional[str] = None + + def _from_proto(self, proto: activities_pb2.ActivityDefinitionInfo): + if proto.HasField("comment"): + self.comment = proto.comment + + return self + + def _to_proto(self) -> activities_pb2.ActivityDefinitionInfo: + proto = activities_pb2.ActivityDefinitionInfo() + + if self.comment: + proto.comment = self.comment + + return proto + @staticmethod def _as_subclass(proto): # No need for MANUAL. @@ -37,6 +54,23 @@ class ManualActivity(Activity): An activity whose execution status is managed outside of Yamcs """ + name: str + """ + Name of the manual activity + """ + + @staticmethod + def _from_proto(proto: activities_pb2.ActivityDefinitionInfo): + activity = ManualActivity(name=proto.args["name"]) + + return super(ManualActivity, activity)._from_proto(proto) + + def _to_proto(self) -> activities_pb2.ActivityDefinitionInfo: + proto = super(ManualActivity, self)._to_proto() + proto.type = "MANUAL" + proto.args["name"] = self.name + return proto + @dataclass class ScriptActivity(Activity): @@ -75,10 +109,10 @@ def _from_proto(proto: activities_pb2.ActivityDefinitionInfo): if "processor" in proto.args: activity.processor = proto.args["processor"] - return activity + return super(ScriptActivity, activity)._from_proto(proto) def _to_proto(self) -> activities_pb2.ActivityDefinitionInfo: - proto = activities_pb2.ActivityDefinitionInfo() + proto = super(ScriptActivity, self)._to_proto() proto.type = "SCRIPT" proto.args["script"] = self.script @@ -127,10 +161,10 @@ def _from_proto(proto: activities_pb2.ActivityDefinitionInfo): if "processor" in proto.args: activity.processor = proto.args["processor"] - return activity + return super(CommandActivity, activity)._from_proto(proto) def _to_proto(self) -> activities_pb2.ActivityDefinitionInfo: - proto = activities_pb2.ActivityDefinitionInfo() + proto = super(CommandActivity, self)._to_proto() proto.type = "COMMAND" proto.args["command"] = self.command @@ -176,10 +210,10 @@ def _from_proto(proto: activities_pb2.ActivityDefinitionInfo): if "processor" in proto.args: activity.processor = proto.args["processor"] - return activity + return super(CommandStackActivity, activity)._from_proto(proto) def _to_proto(self) -> activities_pb2.ActivityDefinitionInfo: - proto = activities_pb2.ActivityDefinitionInfo() + proto = super(CommandStackActivity, self)._to_proto() proto.type = "COMMAND_STACK" proto.args["bucket"] = self.bucket proto.args["stack"] = self.stack diff --git a/yamcs-client/src/yamcs/client/core.py b/yamcs-client/src/yamcs/client/core.py index 5c0a177..e90cd3e 100644 --- a/yamcs-client/src/yamcs/client/core.py +++ b/yamcs-client/src/yamcs/client/core.py @@ -5,6 +5,7 @@ from urllib.parse import urlparse from google.protobuf import timestamp_pb2 +from yamcs.activities.client import ActivitiesClient from yamcs.archive.client import ArchiveClient from yamcs.core.auth import APIKeyCredentials, Credentials from yamcs.core.context import Context @@ -327,6 +328,15 @@ def get_timeline_client(self, instance: str) -> TimelineClient: """ return TimelineClient(self.ctx, instance) + def get_activities_client(self, instance: str) -> ActivitiesClient: + """ + Return and object for working with activities + + :param instance: + A Yamcs instance name. + """ + return ActivitiesClient(self.ctx, instance) + def create_instance( self, name: str,