From f69fbc901ff2e9b1289a748202aca98e065d4ae8 Mon Sep 17 00:00:00 2001 From: Claudio Pastorini Date: Tue, 14 Aug 2018 09:59:13 +0200 Subject: [PATCH 1/2] [WIP]Adds step_function decorator With this new decorator, it is possible to create a Step Function providing its definition with a name using the state_machine_name optional parameter or the function name. With this new capability, it is also possible to add also more than one Step Function with different names --- domovoi/app.py | 37 +++++++++++++---- scripts/domovoi | 104 ++++++++++++++++++++++++------------------------ 2 files changed, 80 insertions(+), 61 deletions(-) diff --git a/domovoi/app.py b/domovoi/app.py index 64b28e6..b61b308 100644 --- a/domovoi/app.py +++ b/domovoi/app.py @@ -38,7 +38,7 @@ class Domovoi(Chalice): sns_subscribers = {} sqs_subscribers = {} s3_subscribers = {} - sfn_tasks = {} + sfns = {} cwl_sub_filters = {} dynamodb_event_sources = {} @@ -105,13 +105,34 @@ def register_rule(func): return func return register_rule - def step_function_task(self, state_name, state_machine_definition): + def step_function(self, state_machine_name=None): + def register_sfn(func): + handler_name = state_machine_name + if handler_name is None: + handler_name = func.__name__ + + self.sfns[handler_name] = dict(state_machine_definition=func, + state_machine_name=handler_name, + states=dict()) + return func + return register_sfn + + def step_function_task(self, state_name, state_machine_definition=None, state_machine_name=None): def register_sfn_task(func): - if state_name in self.sfn_tasks: - raise KeyError(state_name) - self.sfn_tasks[state_name] = dict(state_name=state_name, - state_machine_definition=state_machine_definition, - func=func) + if state_machine_name is None and state_machine_definition is None: + raise Exception("Neither state_machine_name nor state_machine_definition provided") + + if state_machine_name is None: + name = "default" + else: + name = state_machine_name + + if self.sfns.get(name) is None and state_machine_definition is None: + raise Exception("Neither a valid state_machine_name nor state_machine_definition provided") + + self.sfns.setdefault(name, dict(state_machine_definition=state_machine_definition, + state_machine_name=name, + states=dict()))["states"][state_name] = func return func return register_sfn_task @@ -122,7 +143,7 @@ def register_state_machine(self, state_machine_definition): @classmethod def get_all_states(cls, state_machine): - states = dict(state_machine["States"]) + states = state_machine["States"] for state_name, state_data in state_machine["States"].items(): for sub_sm in state_data.get("Branches", []): states.update(cls.get_all_states(sub_sm)) diff --git a/scripts/domovoi b/scripts/domovoi index 59d7a6f..0a5935d 100755 --- a/scripts/domovoi +++ b/scripts/domovoi @@ -311,59 +311,57 @@ if not args.dry_run: for page in awslambda.get_paginator('list_aliases').paginate(FunctionName=function_name): existing_aliases.extend(page["Aliases"]) -state_machine = None -for sfn_task_name, sfn_task in domovoi_app.sfn_tasks.items(): - print("Registering step function state machine for", sfn_task_name) - if state_machine is None: - state_machine = sfn_task["state_machine_definition"] - else: - msg = "Multiple state machine definitions are not supported" - assert state_machine == sfn_task["state_machine_definition"], msg - lambda_alias = "domovoi-stepfunctions-task-" + sfn_task_name - alias_args = dict(FunctionName=function_name, - Name=lambda_alias, - FunctionVersion="$LATEST", - Description="Domovoi Lambda routing label for a Step Functions state machine task") - all_states = domovoi.Domovoi.get_all_states(state_machine) - state = all_states[sfn_task["state_name"]] - if not args.dry_run: - for alias in existing_aliases: - if alias["Name"] == lambda_alias and alias["FunctionVersion"] == "$LATEST": - break - else: - try: - awslambda.create_alias(**alias_args) - except awslambda.exceptions.ResourceConflictException: - awslambda.update_alias(**alias_args) - state["Resource"] = lambda_arn + ":" + lambda_alias - -if state_machine and not args.dry_run: - iam_role_arn = config.iam_role_arn or iam.Role(function_name).arn - sm_args = dict(name=function_name, - definition=json.dumps(state_machine), - roleArn=iam_role_arn) - try: - sm = sfn.create_state_machine(**sm_args) - print("Created new state machine", sm["stateMachineArn"]) - except botocore.exceptions.ClientError as e: - for page in sfn.get_paginator("list_state_machines").paginate(): - for sm in page["stateMachines"]: - if sm["name"] == function_name: - break - if sm["name"] != function_name: - raise e - sm = sfn.describe_state_machine(stateMachineArn=sm["stateMachineArn"]) - sm_args.clear() - if json.loads(sm["definition"]) != state_machine: - sm_args["definition"] = json.dumps(state_machine) - if sm["roleArn"] != iam_role_arn: - sm_args["roleArn"] = iam_role_arn - if sm_args: - print("Updating state machine", sm["stateMachineArn"]) - sfn.update_state_machine(stateMachineArn=sm["stateMachineArn"], **sm_args) - else: - print("No changes required to existing state machine", sm["stateMachineArn"]) - print("State machine:", sm["stateMachineArn"]) +for sfn_name, sfn_data in domovoi_app.sfns.items(): + state_machine = sfn_data["state_machine_definition"]() + for sfn_task_name, sfn_task in sfn_data["states"].items(): + print("Registering step function state machine for", sfn_task_name) + lambda_alias = "domovoi-stepfunctions-task-" + sfn_task_name + alias_args = dict(FunctionName=function_name, + Name=lambda_alias, + FunctionVersion="$LATEST", + Description="Domovoi Lambda routing label for a Step Functions state machine task") + all_states = domovoi.Domovoi.get_all_states(state_machine) + state = all_states[sfn_name] + if state is not None: + if not args.dry_run: + for alias in existing_aliases: + if alias["Name"] == lambda_alias and alias["FunctionVersion"] == "$LATEST": + break + else: + try: + awslambda.create_alias(**alias_args) + except awslambda.exceptions.ResourceConflictException: + awslambda.update_alias(**alias_args) + state["Resource"] = lambda_arn + ":" + lambda_alias + + if state_machine and not args.dry_run: + iam_role_arn = config.iam_role_arn or iam.Role(function_name).arn + name = "{}-{}".format(function_name, sfn_name) if sfn_name != "default" else function_name + sm_args = dict(name=name, + definition=json.dumps(state_machine), + roleArn=iam_role_arn) + try: + sm = sfn.create_state_machine(**sm_args) + print("Created new state machine", sm["stateMachineArn"]) + except botocore.exceptions.ClientError as e: + for page in sfn.get_paginator("list_state_machines").paginate(): + for sm in page["stateMachines"]: + if sm["name"] == function_name: + break + if sm["name"] != function_name: + raise e + sm = sfn.describe_state_machine(stateMachineArn=sm["stateMachineArn"]) + sm_args.clear() + if json.loads(sm["definition"]) != state_machine: + sm_args["definition"] = json.dumps(state_machine) + if sm["roleArn"] != iam_role_arn: + sm_args["roleArn"] = iam_role_arn + if sm_args: + print("Updating state machine", sm["stateMachineArn"]) + sfn.update_state_machine(stateMachineArn=sm["stateMachineArn"], **sm_args) + else: + print("No changes required to existing state machine", sm["stateMachineArn"]) + print("State machine:", sm["stateMachineArn"]) if args.dry_run: print("Dry run successful") From 3753e8f8fcaf6c5a28c157788b5b4019c5dc7003 Mon Sep 17 00:00:00 2001 From: Claudio Pastorini Date: Tue, 21 Aug 2018 19:59:52 +0200 Subject: [PATCH 2/2] Now task specified inside the project are deployed again It fixes the problem that causes the not deployment of the tasks specified inside the project. It was necessary to change the generator of Lambda alias in order to save also the state machine name. Now the name of Lambda's alias will be "domovoi-stepfunctions-{sfn_name}-task-{sfn_task_name}". --- domovoi/app.py | 13 +++++++------ scripts/domovoi | 6 +++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/domovoi/app.py b/domovoi/app.py index b61b308..ecdd184 100644 --- a/domovoi/app.py +++ b/domovoi/app.py @@ -111,7 +111,7 @@ def register_sfn(func): if handler_name is None: handler_name = func.__name__ - self.sfns[handler_name] = dict(state_machine_definition=func, + self.sfns[handler_name] = dict(state_machine_definition=func(), state_machine_name=handler_name, states=dict()) return func @@ -143,7 +143,7 @@ def register_state_machine(self, state_machine_definition): @classmethod def get_all_states(cls, state_machine): - states = state_machine["States"] + states = dict(state_machine["States"]) for state_name, state_data in state_machine["States"].items(): for sub_sm in state_data.get("Branches", []): states.update(cls.get_all_states(sub_sm)) @@ -202,12 +202,13 @@ def __call__(self, event, context): elif "awslogs" in event: event = json.loads(gzip.decompress(base64.b64decode(event["awslogs"]["data"]))) handler = self.cwl_sub_filters[event["logGroup"]]["func"] - elif "domovoi-stepfunctions-task" in invoked_function_arn.resource: + elif "domovoi-stepfunctions-" in invoked_function_arn.resource: _, lambda_name, lambda_alias = invoked_function_arn.resource.split(":") - assert lambda_alias.startswith("domovoi-stepfunctions-task-") - task_name = lambda_alias[len("domovoi-stepfunctions-task-"):] + assert lambda_alias.startswith("domovoi-stepfunctions-") + sfn_name = lambda_alias[len("domovoi-stepfunctions-"):].split("-")[0] + task_name = lambda_alias[len("domovoi-stepfunctions-"):].split("-")[2] context.stepfunctions_task_name = task_name - handler = self.sfn_tasks[task_name]["func"] + handler = self.sfns[sfn_name]["states"][task_name] if handler is None: raise DomovoiException("No handler found for event {}".format(event)) diff --git a/scripts/domovoi b/scripts/domovoi index 0a5935d..c703598 100755 --- a/scripts/domovoi +++ b/scripts/domovoi @@ -312,16 +312,16 @@ if not args.dry_run: existing_aliases.extend(page["Aliases"]) for sfn_name, sfn_data in domovoi_app.sfns.items(): - state_machine = sfn_data["state_machine_definition"]() + state_machine = sfn_data["state_machine_definition"] for sfn_task_name, sfn_task in sfn_data["states"].items(): print("Registering step function state machine for", sfn_task_name) - lambda_alias = "domovoi-stepfunctions-task-" + sfn_task_name + lambda_alias = "domovoi-stepfunctions-" + sfn_name + "-task-" + sfn_task_name alias_args = dict(FunctionName=function_name, Name=lambda_alias, FunctionVersion="$LATEST", Description="Domovoi Lambda routing label for a Step Functions state machine task") all_states = domovoi.Domovoi.get_all_states(state_machine) - state = all_states[sfn_name] + state = all_states[sfn_task_name] if state is not None: if not args.dry_run: for alias in existing_aliases: