diff --git a/functions/aws/control/distributor_events.py b/functions/aws/control/distributor_events.py index 4e43d44..0796eef 100644 --- a/functions/aws/control/distributor_events.py +++ b/functions/aws/control/distributor_events.py @@ -10,12 +10,13 @@ from faaskeeper.node import Node, NodeDataType from faaskeeper.version import EpochCounter, SystemCounter, Version -from faaskeeper.watch import WatchEventType +from faaskeeper.watch import WatchEventType, WatchType from functions.aws.model import SystemStorage +from functions.aws.model.watches import Watches from functions.aws.model.system_storage import Node as SystemNode from functions.aws.model.user_storage import Storage as UserStorage from functions.aws.stats import TimingStatistics - +from functions.aws.model.watches import Watches class DistributorEventType(IntEnum): CREATE_NODE = 0 @@ -83,6 +84,20 @@ def execute( def epoch_counters(self) -> List[str]: pass + @abstractmethod + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + ''' + [watchType, path, watchDetails, node_timestamp] + ''' + pass + + @abstractmethod + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + ''' + store the epoch counters to the EXISTING node (and parent node) in the user storage. + ''' + pass + @abstractmethod def set_system_counter(self, system_counter: SystemCounter): pass @@ -185,7 +200,7 @@ def execute( ) -> Optional[dict]: system_node = system_storage.read_node(self.node) - + print("distributor system node", system_node) status = self._node_status(system_node) if status == TriBool.INCORRECT: @@ -234,8 +249,8 @@ def execute( self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.write(self.node) - # FIXME: update parent epoch and pxid - # self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + # FIXME: update parent pxid + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.parent, set([NodeDataType.CHILDREN])) system_storage.pop_pending_update(system_node.node) @@ -251,11 +266,40 @@ def epoch_counters(self) -> List[str]: return [] def set_system_counter(self, system_counter: SystemCounter): - - # FIXME: parent counter self.node.created = Version(system_counter, None) self.node.modified = Version(system_counter, None) + self.parent.modified = Version(system_counter, None) + + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + # Query watches from DynaamoDB to decide later, if we should actually invoke call function, then we abstract if statement + + # if they should even be scheduled. + all_watches = [] + + all_watches += region_watches.query_watches( + self.node.path, [WatchType.EXISTS] + ) + + all_watches += region_watches.query_watches( + self.parent.path, [WatchType.GET_CHILDREN] + ) + + for idx, watch_entity in enumerate(all_watches): + # assign node or parent node timestamp to the results + if watch_entity[1] == self.node.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CREATED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum) + elif watch_entity[1] == self.parent.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum) + return all_watches + + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.node) + # FIXME: update pxid. + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.parent, set([NodeDataType.CHILDREN])) + @property def node(self) -> Node: return self._node @@ -422,6 +466,20 @@ def set_system_counter(self, system_counter: SystemCounter): self.node.modified = Version(system_counter, None) + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + all_watches = [] + all_watches += region_watches.query_watches( + self.node.path, [WatchType.GET_DATA, WatchType.EXISTS] + ) + + for idx, watch_entity in enumerate(all_watches): + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_DATA_CHANGED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum) + + return all_watches + + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.node, set([NodeDataType.MODIFIED, NodeDataType.DATA])) class DistributorDeleteNode(DistributorEvent): @@ -552,7 +610,7 @@ def execute( # FIXME: retain the node to keep counters # self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.delete(self.node) - # self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.parent, set([NodeDataType.CHILDREN])) system_storage.pop_pending_update(system_node.node) @@ -567,8 +625,34 @@ def epoch_counters(self) -> List[str]: return [] def set_system_counter(self, system_counter: SystemCounter): - # FIXME: parent counter - pass + # pass + self.node.modified = Version(system_counter, None) # this is for notification use, have nothing to do w/ the node commit + self.parent.modified = Version(system_counter, None) + + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + # Query watches from DynaamoDB to decide later, if we should actually invoke call function, then we abstract if statement + + # if they should even be scheduled. + all_watches = [] + all_watches += region_watches.query_watches( + self.node.path, [WatchType.EXISTS, WatchType.GET_DATA, WatchType.GET_CHILDREN] + ) + + all_watches += region_watches.query_watches( + self.parent.path, [WatchType.GET_CHILDREN] + ) + + for idx, watch_entity in enumerate(all_watches): + if watch_entity[1] == self.node.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_DELETED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum) + elif watch_entity[1] == self.parent.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum) + + return all_watches + + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.parent, set([NodeDataType.CHILDREN])) @property def node(self) -> Node: @@ -597,7 +681,7 @@ def builder( raise NotImplementedError() distr_event = ops[event_type] - op = distr_event.deserialize(event) + op:DistributorEvent = distr_event.deserialize(event) op.set_system_counter(counter) return op diff --git a/functions/aws/distributor.py b/functions/aws/distributor.py index 3ffdbd5..c94adb9 100644 --- a/functions/aws/distributor.py +++ b/functions/aws/distributor.py @@ -1,6 +1,7 @@ import json import logging import time +import hashlib from concurrent.futures import Future, ThreadPoolExecutor from typing import Dict, List, Set @@ -8,10 +9,9 @@ from faaskeeper.stats import StorageStatistics from faaskeeper.version import SystemCounter -from faaskeeper.watch import WatchType from functions.aws.config import Config from functions.aws.control.channel import Client -from functions.aws.control.distributor_events import DistributorEventType, builder +from functions.aws.control.distributor_events import DistributorEvent, DistributorEventType, builder from functions.aws.model.watches import Watches from functions.aws.stats import TimingStatistics @@ -43,7 +43,7 @@ # verbose_output = False # FIXME: proper data structure region_clients = {} -region_watches = {} +region_watches: Dict[str, Watches] = {} epoch_counters: Dict[str, Set[str]] = {} for r in regions: region_clients[r] = boto3.client("lambda", region_name=r) @@ -56,28 +56,33 @@ def get_object(obj: dict): return next(iter(obj.values())) -def launch_watcher(region: str, json_in: dict): +def launch_watcher(operation: DistributorEvent, region: str, json_in: dict): """ (1) Submit watcher (2) Wait for completion (3) Remove ephemeral counter. """ - # FIXME process result - region_clients[region].invoke( + is_delivered = region_clients[region].invoke( FunctionName=f"{config.deployment_name}-watch", InvocationType="RequestResponse", Payload=json.dumps(json_in).encode(), ) - -# def query_watch_id(region: str, node_path: str): -# return region_watches[region].get_watch_counters(node_path) + if is_delivered: + hashed_path = hashlib.md5(json_in["path"].encode()).hexdigest() + timestamp = json_in["timestamp"] + watch_type = json_in["type"] + + # pop the pending watch: update epoch counters for the node and parent in user storage. + epoch_counters[r].remove(f"{hashed_path}_{watch_type}_{timestamp}") + operation.update_epoch_counters(config.user_storage, epoch_counters[r]) + return True + return False timing_stats = TimingStatistics.instance() def handler(event: dict, context): - events = event["Records"] logging.info(f"Begin processing {len(events)} events") @@ -150,19 +155,18 @@ def handler(event: dict, context): if config.benchmarking: begin_watch = time.time() # start watch delivery - for r in regions: - # if event_type == DistributorEventType.SET_DATA: - # watches_submitters.append( - # executor.submit(launch_watcher, r, watches) - # ) - # FIXME: other watchers - # FIXME: reenable submission - # Query watches from DynaamoDB to decide later - # if they should even be scheduled. - region_watches[r].query_watches( - operation.node.path, [WatchType.GET_DATA] - ) - + for r in regions: # deliver watch concurrently + for watch in operation.generate_watches_event(region_watches[r]): + watch_dict = { + "event": watch.watch_event_type, + "type": watch.watch_type, + "path": watch.node_path, + "timestamp": watch.mFxidSys, + } + + watches_submitters.append( + executor.submit(launch_watcher, operation, r, watch_dict) # watch: {DistributorEvent, watchType, timestamp, path} + ) if config.benchmarking: end_watch = time.time() timing_stats.add_result("watch_query", end_watch - begin_watch) diff --git a/functions/aws/model/watches.py b/functions/aws/model/watches.py index 2550807..952455a 100644 --- a/functions/aws/model/watches.py +++ b/functions/aws/model/watches.py @@ -5,8 +5,12 @@ from faaskeeper.watch import WatchType from functions.aws.control.dynamo import DynamoStorage as DynamoDriver +from collections import namedtuple class Watches: + + Watch_Event = namedtuple("Watch_Event", ["watch_event_type","watch_type", "node_path", "mFxidSys"]) + def __init__(self, storage_name: str, region: str): self._storage = DynamoDriver(f"{storage_name}-watch", "path") self._region = region @@ -23,16 +27,18 @@ def query_watches(self, node_path: str, counters: List[WatchType]): ret = self._storage.read(node_path) data = [] - if "Attributes" in ret: + if "Item" in ret: for c in counters: - data.append( - ( - c, - self._type_deserializer.deserialize( - ret["Attributes"][self._counters.get(c)] - ), + if self._counters.get(c) in ret["Item"]: + data.append( + ( + c, + node_path, + self._type_deserializer.deserialize( + ret["Item"][self._counters.get(c)] + ), + ) ) - ) return data except self._storage.errorSupplier.ResourceNotFoundException: return [] diff --git a/functions/aws/watch.py b/functions/aws/watch.py index a4d8a4d..5bac1f4 100644 --- a/functions/aws/watch.py +++ b/functions/aws/watch.py @@ -19,41 +19,37 @@ def handler(event: dict, context: dict): try: watch_event = WatchEventType(event["event"]) + watch_type = WatchType(event["type"]) timestamp = event["timestamp"] path = event["path"] watches_to_retain = [] - if watch_event == WatchEventType.NODE_DATA_CHANGED: - watches = region_watches.get_watches(path, [WatchType.GET_DATA]) - if len(watches): - for client in watches[0][1]: - version = int(client[0]) - if version >= timestamp: - if verbose: - print(f"Retaining watch with timestamp {version}") - watches_to_retain.append(client) - else: - client_ip = client[1] - client_port = int(client[2]) - if verbose: - print(f"Notify client at {client_ip}:{client_port}") - notify( - client_ip, - client_port, - { - "watch-event": WatchEventType.NODE_DATA_CHANGED.value, - "timestamp": timestamp, - "path": path, - }, - ) - # FIXME: actual notification - else: - raise NotImplementedError() - - # FIXME: remove watches from DynamoDB - + watches = region_watches.get_watches(path, [watch_type]) + if len(watches): + for client in watches[0][1]: + version = int(client[0]) + if version >= timestamp: + if verbose: + print(f"Retaining watch with timestamp {version}") + watches_to_retain.append(client) + else: + client_ip = client[1] + client_port = int(client[2]) + if verbose: + print(f"Notify client at {client_ip}:{client_port}") + notify( + client_ip, + client_port, + { + "watch-event": watch_event.value, + "timestamp": timestamp, + "path": path, + }, + ) + return True except Exception: print("Failure!") import traceback traceback.print_exc() + return False