From 6f0c733674c9d578fcb69ced0917914c53f833a1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 2 Sep 2023 20:21:23 +0000 Subject: [PATCH 1/5] Get, Exists and GetChildren watch in aws. Pending watch notification removal in epoch counters --- functions/aws/control/distributor_events.py | 104 ++++++++++++++++++-- functions/aws/distributor.py | 52 +++++----- functions/aws/model/watches.py | 22 +++-- functions/aws/watch.py | 54 +++++----- 4 files changed, 161 insertions(+), 71 deletions(-) diff --git a/functions/aws/control/distributor_events.py b/functions/aws/control/distributor_events.py index 4e43d44..a64eafc 100644 --- a/functions/aws/control/distributor_events.py +++ b/functions/aws/control/distributor_events.py @@ -12,10 +12,11 @@ from faaskeeper.version import EpochCounter, SystemCounter, Version from faaskeeper.watch import WatchEventType from functions.aws.model import SystemStorage +from functions.aws.model.watches import Watches, WatchType from functions.aws.model.system_storage import Node as SystemNode from functions.aws.model.user_storage import Storage as UserStorage from functions.aws.stats import TimingStatistics - +from functions.aws.model.watches import Watches class DistributorEventType(IntEnum): CREATE_NODE = 0 @@ -83,6 +84,20 @@ def execute( def epoch_counters(self) -> List[str]: pass + @abstractmethod + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + ''' + [watchType, path, watchDetails, node_timestamp] + ''' + pass + + @abstractmethod + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + ''' + store the epoch counters to the EXISTING node (and parent node) in the user storage. + ''' + pass + @abstractmethod def set_system_counter(self, system_counter: SystemCounter): pass @@ -185,7 +200,7 @@ def execute( ) -> Optional[dict]: system_node = system_storage.read_node(self.node) - + print("distributor system node", system_node) status = self._node_status(system_node) if status == TriBool.INCORRECT: @@ -234,8 +249,8 @@ def execute( self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.write(self.node) - # FIXME: update parent epoch and pxid - # self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + # FIXME: update parent pxid + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.parent, set([NodeDataType.CHILDREN])) system_storage.pop_pending_update(system_node.node) @@ -251,11 +266,40 @@ def epoch_counters(self) -> List[str]: return [] def set_system_counter(self, system_counter: SystemCounter): - - # FIXME: parent counter self.node.created = Version(system_counter, None) self.node.modified = Version(system_counter, None) + self.parent.modified = Version(system_counter, None) + + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + # Query watches from DynaamoDB to decide later, if we should actually invoke call function, then we abstract if statement + + # if they should even be scheduled. + all_watches = [] + + all_watches += region_watches.query_watches( + self.node.path, [WatchType.EXISTS] + ) + + all_watches += region_watches.query_watches( + self.parent.path, [WatchType.GET_CHILDREN] + ) + + for idx, watch_entity in enumerate(all_watches): + # assign node or parent node timestamp to the results + if watch_entity[1] == self.node.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CREATED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum) + elif watch_entity[1] == self.parent.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum) + return all_watches + + def store_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.node) + # FIXME: update pxid. + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.parent, set([NodeDataType.CHILDREN])) + @property def node(self) -> Node: return self._node @@ -422,6 +466,20 @@ def set_system_counter(self, system_counter: SystemCounter): self.node.modified = Version(system_counter, None) + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + all_watches = [] + all_watches += region_watches.query_watches( + self.node.path, [WatchType.GET_DATA, WatchType.EXISTS] + ) + + for idx, watch_entity in enumerate(all_watches): + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_DATA_CHANGED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum) + + return all_watches + + def store_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.node, set([NodeDataType.MODIFIED, NodeDataType.DATA])) class DistributorDeleteNode(DistributorEvent): @@ -552,7 +610,7 @@ def execute( # FIXME: retain the node to keep counters # self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.delete(self.node) - # self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.parent, set([NodeDataType.CHILDREN])) system_storage.pop_pending_update(system_node.node) @@ -567,8 +625,34 @@ def epoch_counters(self) -> List[str]: return [] def set_system_counter(self, system_counter: SystemCounter): - # FIXME: parent counter - pass + # pass + self.node.modified = Version(system_counter, None) # this is for notification use, have nothing to do w/ the node commit + self.parent.modified = Version(system_counter, None) + + def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]: + # Query watches from DynaamoDB to decide later, if we should actually invoke call function, then we abstract if statement + + # if they should even be scheduled. + all_watches = [] + all_watches += region_watches.query_watches( + self.node.path, [WatchType.EXISTS, WatchType.GET_DATA, WatchType.GET_CHILDREN] + ) + + all_watches += region_watches.query_watches( + self.parent.path, [WatchType.GET_CHILDREN] + ) + + for idx, watch_entity in enumerate(all_watches): + if watch_entity[1] == self.node.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_DELETED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum) + elif watch_entity[1] == self.parent.path: + all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum) + + return all_watches + + def store_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) + user_storage.update(self.parent, set([NodeDataType.CHILDREN])) @property def node(self) -> Node: @@ -597,7 +681,7 @@ def builder( raise NotImplementedError() distr_event = ops[event_type] - op = distr_event.deserialize(event) + op:DistributorEvent = distr_event.deserialize(event) op.set_system_counter(counter) return op diff --git a/functions/aws/distributor.py b/functions/aws/distributor.py index 3ffdbd5..16b1e08 100644 --- a/functions/aws/distributor.py +++ b/functions/aws/distributor.py @@ -1,6 +1,7 @@ import json import logging import time +import hashlib from concurrent.futures import Future, ThreadPoolExecutor from typing import Dict, List, Set @@ -8,10 +9,9 @@ from faaskeeper.stats import StorageStatistics from faaskeeper.version import SystemCounter -from faaskeeper.watch import WatchType from functions.aws.config import Config from functions.aws.control.channel import Client -from functions.aws.control.distributor_events import DistributorEventType, builder +from functions.aws.control.distributor_events import DistributorEvent, DistributorEventType, builder from functions.aws.model.watches import Watches from functions.aws.stats import TimingStatistics @@ -43,7 +43,7 @@ # verbose_output = False # FIXME: proper data structure region_clients = {} -region_watches = {} +region_watches: Dict[str, Watches] = {} epoch_counters: Dict[str, Set[str]] = {} for r in regions: region_clients[r] = boto3.client("lambda", region_name=r) @@ -56,28 +56,33 @@ def get_object(obj: dict): return next(iter(obj.values())) -def launch_watcher(region: str, json_in: dict): +def launch_watcher(operation: DistributorEvent, region: str, json_in: dict): """ (1) Submit watcher (2) Wait for completion (3) Remove ephemeral counter. """ - # FIXME process result - region_clients[region].invoke( + is_delivered = region_clients[region].invoke( FunctionName=f"{config.deployment_name}-watch", InvocationType="RequestResponse", Payload=json.dumps(json_in).encode(), ) - -# def query_watch_id(region: str, node_path: str): -# return region_watches[region].get_watch_counters(node_path) + if is_delivered: + hashed_path = hashlib.md5(json_in["path"].encode()).hexdigest() + timestamp = json_in["timestamp"] + watch_type = json_in["type"] + + # pop the pending watch: update epoch counters for the node and parent in user storage. + epoch_counters[r].remove(f"{hashed_path}_{watch_type}_{timestamp}") + operation.update_epoch_counters(config.user_storage, epoch_counters[r]) # 结合client那边,或许应该要放到其他地方 + return True + return False timing_stats = TimingStatistics.instance() def handler(event: dict, context): - events = event["Records"] logging.info(f"Begin processing {len(events)} events") @@ -111,7 +116,7 @@ def handler(event: dict, context): elif "body" in record: write_event = json.loads(record["body"]) event_type = DistributorEventType(int(write_event["type"]["N"])) - if "data" in record["messageAttributes"]: + if "data" in record["messageAttributes"]: # TODO: is this a case from other operation write_event["data"] = { "B": record["messageAttributes"]["data"]["binaryValue"] } @@ -150,19 +155,18 @@ def handler(event: dict, context): if config.benchmarking: begin_watch = time.time() # start watch delivery - for r in regions: - # if event_type == DistributorEventType.SET_DATA: - # watches_submitters.append( - # executor.submit(launch_watcher, r, watches) - # ) - # FIXME: other watchers - # FIXME: reenable submission - # Query watches from DynaamoDB to decide later - # if they should even be scheduled. - region_watches[r].query_watches( - operation.node.path, [WatchType.GET_DATA] - ) - + for r in regions: # deliver watch concurrently + for watch in operation.generate_watches_event(region_watches[r]): + watch_dict = { + "event": watch.watch_event_type, + "type": watch.watch_type, + "path": watch.node_path, + "timestamp": watch.mFxidSys, + } + + watches_submitters.append( + executor.submit(launch_watcher, operation, r, watch_dict) # watch: {DistributorEvent, watchType, timestamp, path} + ) if config.benchmarking: end_watch = time.time() timing_stats.add_result("watch_query", end_watch - begin_watch) diff --git a/functions/aws/model/watches.py b/functions/aws/model/watches.py index 2550807..952455a 100644 --- a/functions/aws/model/watches.py +++ b/functions/aws/model/watches.py @@ -5,8 +5,12 @@ from faaskeeper.watch import WatchType from functions.aws.control.dynamo import DynamoStorage as DynamoDriver +from collections import namedtuple class Watches: + + Watch_Event = namedtuple("Watch_Event", ["watch_event_type","watch_type", "node_path", "mFxidSys"]) + def __init__(self, storage_name: str, region: str): self._storage = DynamoDriver(f"{storage_name}-watch", "path") self._region = region @@ -23,16 +27,18 @@ def query_watches(self, node_path: str, counters: List[WatchType]): ret = self._storage.read(node_path) data = [] - if "Attributes" in ret: + if "Item" in ret: for c in counters: - data.append( - ( - c, - self._type_deserializer.deserialize( - ret["Attributes"][self._counters.get(c)] - ), + if self._counters.get(c) in ret["Item"]: + data.append( + ( + c, + node_path, + self._type_deserializer.deserialize( + ret["Item"][self._counters.get(c)] + ), + ) ) - ) return data except self._storage.errorSupplier.ResourceNotFoundException: return [] diff --git a/functions/aws/watch.py b/functions/aws/watch.py index a4d8a4d..5bac1f4 100644 --- a/functions/aws/watch.py +++ b/functions/aws/watch.py @@ -19,41 +19,37 @@ def handler(event: dict, context: dict): try: watch_event = WatchEventType(event["event"]) + watch_type = WatchType(event["type"]) timestamp = event["timestamp"] path = event["path"] watches_to_retain = [] - if watch_event == WatchEventType.NODE_DATA_CHANGED: - watches = region_watches.get_watches(path, [WatchType.GET_DATA]) - if len(watches): - for client in watches[0][1]: - version = int(client[0]) - if version >= timestamp: - if verbose: - print(f"Retaining watch with timestamp {version}") - watches_to_retain.append(client) - else: - client_ip = client[1] - client_port = int(client[2]) - if verbose: - print(f"Notify client at {client_ip}:{client_port}") - notify( - client_ip, - client_port, - { - "watch-event": WatchEventType.NODE_DATA_CHANGED.value, - "timestamp": timestamp, - "path": path, - }, - ) - # FIXME: actual notification - else: - raise NotImplementedError() - - # FIXME: remove watches from DynamoDB - + watches = region_watches.get_watches(path, [watch_type]) + if len(watches): + for client in watches[0][1]: + version = int(client[0]) + if version >= timestamp: + if verbose: + print(f"Retaining watch with timestamp {version}") + watches_to_retain.append(client) + else: + client_ip = client[1] + client_port = int(client[2]) + if verbose: + print(f"Notify client at {client_ip}:{client_port}") + notify( + client_ip, + client_port, + { + "watch-event": watch_event.value, + "timestamp": timestamp, + "path": path, + }, + ) + return True except Exception: print("Failure!") import traceback traceback.print_exc() + return False From 8cd2e6b552a51b67285005c1bfe8442570750677 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 10 Sep 2023 20:08:39 +0000 Subject: [PATCH 2/5] fix method name in distributor events --- functions/aws/control/distributor_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/functions/aws/control/distributor_events.py b/functions/aws/control/distributor_events.py index a64eafc..765e537 100644 --- a/functions/aws/control/distributor_events.py +++ b/functions/aws/control/distributor_events.py @@ -293,7 +293,7 @@ def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_ all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum) return all_watches - def store_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.node) # FIXME: update pxid. @@ -477,7 +477,7 @@ def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_ return all_watches - def store_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.node, set([NodeDataType.MODIFIED, NodeDataType.DATA])) @@ -650,7 +650,7 @@ def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_ return all_watches - def store_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): + def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]): self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters) user_storage.update(self.parent, set([NodeDataType.CHILDREN])) From 6e515bdc77d122443639f63101ff09d77aaa234f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 10 Sep 2023 20:15:57 +0000 Subject: [PATCH 3/5] fix watchType import --- functions/aws/control/distributor_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/functions/aws/control/distributor_events.py b/functions/aws/control/distributor_events.py index 765e537..0796eef 100644 --- a/functions/aws/control/distributor_events.py +++ b/functions/aws/control/distributor_events.py @@ -10,9 +10,9 @@ from faaskeeper.node import Node, NodeDataType from faaskeeper.version import EpochCounter, SystemCounter, Version -from faaskeeper.watch import WatchEventType +from faaskeeper.watch import WatchEventType, WatchType from functions.aws.model import SystemStorage -from functions.aws.model.watches import Watches, WatchType +from functions.aws.model.watches import Watches from functions.aws.model.system_storage import Node as SystemNode from functions.aws.model.user_storage import Storage as UserStorage from functions.aws.stats import TimingStatistics From 216392ac2ddf7488eda9c35e48aabc136e61adc3 Mon Sep 17 00:00:00 2001 From: HanayoZz Date: Sun, 10 Sep 2023 16:23:22 -0400 Subject: [PATCH 4/5] remove unnessary comments in distributor --- functions/aws/distributor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/aws/distributor.py b/functions/aws/distributor.py index 16b1e08..eb5728b 100644 --- a/functions/aws/distributor.py +++ b/functions/aws/distributor.py @@ -75,7 +75,7 @@ def launch_watcher(operation: DistributorEvent, region: str, json_in: dict): # pop the pending watch: update epoch counters for the node and parent in user storage. epoch_counters[r].remove(f"{hashed_path}_{watch_type}_{timestamp}") - operation.update_epoch_counters(config.user_storage, epoch_counters[r]) # 结合client那边,或许应该要放到其他地方 + operation.update_epoch_counters(config.user_storage, epoch_counters[r]) return True return False From b8e4f0e76954cd3a77efab7fbf7e2fc09de0466d Mon Sep 17 00:00:00 2001 From: HanayoZz Date: Sun, 10 Sep 2023 16:27:31 -0400 Subject: [PATCH 5/5] remove unnessary comments in distributor --- functions/aws/distributor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/aws/distributor.py b/functions/aws/distributor.py index eb5728b..c94adb9 100644 --- a/functions/aws/distributor.py +++ b/functions/aws/distributor.py @@ -116,7 +116,7 @@ def handler(event: dict, context): elif "body" in record: write_event = json.loads(record["body"]) event_type = DistributorEventType(int(write_event["type"]["N"])) - if "data" in record["messageAttributes"]: # TODO: is this a case from other operation + if "data" in record["messageAttributes"]: write_event["data"] = { "B": record["messageAttributes"]["data"]["binaryValue"] }