Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 95 additions & 11 deletions functions/aws/control/distributor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

from faaskeeper.node import Node, NodeDataType
from faaskeeper.version import EpochCounter, SystemCounter, Version
from faaskeeper.watch import WatchEventType
from faaskeeper.watch import WatchEventType, WatchType
from functions.aws.model import SystemStorage
from functions.aws.model.watches import Watches
from functions.aws.model.system_storage import Node as SystemNode
from functions.aws.model.user_storage import Storage as UserStorage
from functions.aws.stats import TimingStatistics

from functions.aws.model.watches import Watches

class DistributorEventType(IntEnum):
CREATE_NODE = 0
Expand Down Expand Up @@ -83,6 +84,20 @@ def execute(
def epoch_counters(self) -> List[str]:
pass

@abstractmethod
def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]:
'''
[watchType, path, watchDetails, node_timestamp]
'''
pass

@abstractmethod
def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]):
'''
store the epoch counters to the EXISTING node (and parent node) in the user storage.
'''
pass

@abstractmethod
def set_system_counter(self, system_counter: SystemCounter):
pass
Expand Down Expand Up @@ -185,7 +200,7 @@ def execute(
) -> Optional[dict]:

system_node = system_storage.read_node(self.node)

print("distributor system node", system_node)
status = self._node_status(system_node)

if status == TriBool.INCORRECT:
Expand Down Expand Up @@ -234,8 +249,8 @@ def execute(

self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.write(self.node)
# FIXME: update parent epoch and pxid
# self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
# FIXME: update parent pxid
self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.update(self.parent, set([NodeDataType.CHILDREN]))

system_storage.pop_pending_update(system_node.node)
Expand All @@ -251,11 +266,40 @@ def epoch_counters(self) -> List[str]:
return []

def set_system_counter(self, system_counter: SystemCounter):

# FIXME: parent counter
self.node.created = Version(system_counter, None)
self.node.modified = Version(system_counter, None)

self.parent.modified = Version(system_counter, None)

def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]:
# Query watches from DynaamoDB to decide later, if we should actually invoke call function, then we abstract if statement

# if they should even be scheduled.
all_watches = []

all_watches += region_watches.query_watches(
self.node.path, [WatchType.EXISTS]
)

all_watches += region_watches.query_watches(
self.parent.path, [WatchType.GET_CHILDREN]
)

for idx, watch_entity in enumerate(all_watches):
# assign node or parent node timestamp to the results
if watch_entity[1] == self.node.path:
all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CREATED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum)
elif watch_entity[1] == self.parent.path:
all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum)
return all_watches

def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]):
self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.update(self.node)
# FIXME: update pxid.
self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.update(self.parent, set([NodeDataType.CHILDREN]))

@property
def node(self) -> Node:
return self._node
Expand Down Expand Up @@ -422,6 +466,20 @@ def set_system_counter(self, system_counter: SystemCounter):

self.node.modified = Version(system_counter, None)

def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]:
all_watches = []
all_watches += region_watches.query_watches(
self.node.path, [WatchType.GET_DATA, WatchType.EXISTS]
)

for idx, watch_entity in enumerate(all_watches):
all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_DATA_CHANGED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum)

return all_watches

def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]):
self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.update(self.node, set([NodeDataType.MODIFIED, NodeDataType.DATA]))

class DistributorDeleteNode(DistributorEvent):

Expand Down Expand Up @@ -552,7 +610,7 @@ def execute(
# FIXME: retain the node to keep counters
# self.node.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.delete(self.node)
# self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.update(self.parent, set([NodeDataType.CHILDREN]))

system_storage.pop_pending_update(system_node.node)
Expand All @@ -567,8 +625,34 @@ def epoch_counters(self) -> List[str]:
return []

def set_system_counter(self, system_counter: SystemCounter):
# FIXME: parent counter
pass
# pass
self.node.modified = Version(system_counter, None) # this is for notification use, have nothing to do w/ the node commit
self.parent.modified = Version(system_counter, None)

def generate_watches_event(self, region_watches: Watches) -> List[Watches.Watch_Event]:
# Query watches from DynaamoDB to decide later, if we should actually invoke call function, then we abstract if statement

# if they should even be scheduled.
all_watches = []
all_watches += region_watches.query_watches(
self.node.path, [WatchType.EXISTS, WatchType.GET_DATA, WatchType.GET_CHILDREN]
)

all_watches += region_watches.query_watches(
self.parent.path, [WatchType.GET_CHILDREN]
)

for idx, watch_entity in enumerate(all_watches):
if watch_entity[1] == self.node.path:
all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_DELETED.value, watch_entity[0].value, watch_entity[1], self.node.modified.system.sum)
elif watch_entity[1] == self.parent.path:
all_watches[idx] = Watches.Watch_Event(WatchEventType.NODE_CHILDREN_CHANGED.value, watch_entity[0].value, watch_entity[1], self.parent.modified.system.sum)

return all_watches

def update_epoch_counters(self, user_storage: UserStorage, epoch_counters: Set[str]):
self.parent.modified.epoch = EpochCounter.from_raw_data(epoch_counters)
user_storage.update(self.parent, set([NodeDataType.CHILDREN]))

@property
def node(self) -> Node:
Expand Down Expand Up @@ -597,7 +681,7 @@ def builder(
raise NotImplementedError()

distr_event = ops[event_type]
op = distr_event.deserialize(event)
op:DistributorEvent = distr_event.deserialize(event)
op.set_system_counter(counter)

return op
50 changes: 27 additions & 23 deletions functions/aws/distributor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import json
import logging
import time
import hashlib
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Dict, List, Set

import boto3

from faaskeeper.stats import StorageStatistics
from faaskeeper.version import SystemCounter
from faaskeeper.watch import WatchType
from functions.aws.config import Config
from functions.aws.control.channel import Client
from functions.aws.control.distributor_events import DistributorEventType, builder
from functions.aws.control.distributor_events import DistributorEvent, DistributorEventType, builder
from functions.aws.model.watches import Watches
from functions.aws.stats import TimingStatistics

Expand Down Expand Up @@ -43,7 +43,7 @@
# verbose_output = False
# FIXME: proper data structure
region_clients = {}
region_watches = {}
region_watches: Dict[str, Watches] = {}
epoch_counters: Dict[str, Set[str]] = {}
for r in regions:
region_clients[r] = boto3.client("lambda", region_name=r)
Expand All @@ -56,28 +56,33 @@ def get_object(obj: dict):
return next(iter(obj.values()))


def launch_watcher(region: str, json_in: dict):
def launch_watcher(operation: DistributorEvent, region: str, json_in: dict):
"""
(1) Submit watcher
(2) Wait for completion
(3) Remove ephemeral counter.
"""
# FIXME process result
region_clients[region].invoke(
is_delivered = region_clients[region].invoke(
FunctionName=f"{config.deployment_name}-watch",
InvocationType="RequestResponse",
Payload=json.dumps(json_in).encode(),
)


# def query_watch_id(region: str, node_path: str):
# return region_watches[region].get_watch_counters(node_path)
if is_delivered:
hashed_path = hashlib.md5(json_in["path"].encode()).hexdigest()
timestamp = json_in["timestamp"]
watch_type = json_in["type"]

# pop the pending watch: update epoch counters for the node and parent in user storage.
epoch_counters[r].remove(f"{hashed_path}_{watch_type}_{timestamp}")
operation.update_epoch_counters(config.user_storage, epoch_counters[r])
return True
return False

timing_stats = TimingStatistics.instance()


def handler(event: dict, context):

events = event["Records"]
logging.info(f"Begin processing {len(events)} events")

Expand Down Expand Up @@ -150,19 +155,18 @@ def handler(event: dict, context):
if config.benchmarking:
begin_watch = time.time()
# start watch delivery
for r in regions:
# if event_type == DistributorEventType.SET_DATA:
# watches_submitters.append(
# executor.submit(launch_watcher, r, watches)
# )
# FIXME: other watchers
# FIXME: reenable submission
# Query watches from DynaamoDB to decide later
# if they should even be scheduled.
region_watches[r].query_watches(
operation.node.path, [WatchType.GET_DATA]
)

for r in regions: # deliver watch concurrently
for watch in operation.generate_watches_event(region_watches[r]):
watch_dict = {
"event": watch.watch_event_type,
"type": watch.watch_type,
"path": watch.node_path,
"timestamp": watch.mFxidSys,
}

watches_submitters.append(
executor.submit(launch_watcher, operation, r, watch_dict) # watch: {DistributorEvent, watchType, timestamp, path}
)
if config.benchmarking:
end_watch = time.time()
timing_stats.add_result("watch_query", end_watch - begin_watch)
Expand Down
22 changes: 14 additions & 8 deletions functions/aws/model/watches.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from faaskeeper.watch import WatchType
from functions.aws.control.dynamo import DynamoStorage as DynamoDriver

from collections import namedtuple

class Watches:

Watch_Event = namedtuple("Watch_Event", ["watch_event_type","watch_type", "node_path", "mFxidSys"])

def __init__(self, storage_name: str, region: str):
self._storage = DynamoDriver(f"{storage_name}-watch", "path")
self._region = region
Expand All @@ -23,16 +27,18 @@ def query_watches(self, node_path: str, counters: List[WatchType]):
ret = self._storage.read(node_path)

data = []
if "Attributes" in ret:
if "Item" in ret:
for c in counters:
data.append(
(
c,
self._type_deserializer.deserialize(
ret["Attributes"][self._counters.get(c)]
),
if self._counters.get(c) in ret["Item"]:
data.append(
(
c,
node_path,
self._type_deserializer.deserialize(
ret["Item"][self._counters.get(c)]
),
)
)
)
return data
except self._storage.errorSupplier.ResourceNotFoundException:
return []
Expand Down
54 changes: 25 additions & 29 deletions functions/aws/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,37 @@ def handler(event: dict, context: dict):

try:
watch_event = WatchEventType(event["event"])
watch_type = WatchType(event["type"])
timestamp = event["timestamp"]
path = event["path"]

watches_to_retain = []
if watch_event == WatchEventType.NODE_DATA_CHANGED:
watches = region_watches.get_watches(path, [WatchType.GET_DATA])
if len(watches):
for client in watches[0][1]:
version = int(client[0])
if version >= timestamp:
if verbose:
print(f"Retaining watch with timestamp {version}")
watches_to_retain.append(client)
else:
client_ip = client[1]
client_port = int(client[2])
if verbose:
print(f"Notify client at {client_ip}:{client_port}")
notify(
client_ip,
client_port,
{
"watch-event": WatchEventType.NODE_DATA_CHANGED.value,
"timestamp": timestamp,
"path": path,
},
)
# FIXME: actual notification
else:
raise NotImplementedError()

# FIXME: remove watches from DynamoDB

watches = region_watches.get_watches(path, [watch_type])
if len(watches):
for client in watches[0][1]:
version = int(client[0])
if version >= timestamp:
if verbose:
print(f"Retaining watch with timestamp {version}")
watches_to_retain.append(client)
else:
client_ip = client[1]
client_port = int(client[2])
if verbose:
print(f"Notify client at {client_ip}:{client_port}")
notify(
client_ip,
client_port,
{
"watch-event": watch_event.value,
"timestamp": timestamp,
"path": path,
},
)
return True
except Exception:
print("Failure!")
import traceback

traceback.print_exc()
return False