Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5816fa2
create node workflow
Aug 6, 2023
b3ec051
workflow draft done w/o watch and client
Aug 6, 2023
bc5afc2
fix distributor queue
Aug 19, 2023
baeec8d
set data and delete node draft version
Aug 20, 2023
facf31a
gcp watch draft
Sep 2, 2023
dc4af43
add watch impl abstract methods
Sep 10, 2023
62eda2e
distributor create node watch impl
Sep 10, 2023
1b5a4de
gcp watch
Sep 11, 2023
55e3585
watch notification and epoch counter update
Oct 4, 2023
74ee26b
delete archived test cases
Oct 5, 2023
29f388a
remove unwanted print and comments
Oct 5, 2023
064f00d
add distributor queue name as a env var
Oct 5, 2023
132a7e1
serverless config for gcp deployment
Oct 15, 2023
eff0463
add init process to gcp
Oct 15, 2023
e49b2db
add plugin serverless-google-cloudfunctions install to install.py
Oct 15, 2023
e756cc2
fadd project and database argument to class, the GCP deployment now p…
Oct 16, 2023
b385767
add datastore database name as a property of config
Oct 16, 2023
021b616
improve the naming of the user storage bucket
Oct 17, 2023
e6b5f08
add bucket-name into config, let user make sure its name is globally …
Oct 18, 2023
1ee20a9
add deregister user logic in system storage
Oct 18, 2023
5f4e290
use session_id+req_id as the timestamp in writer-event
Oct 18, 2023
cfb9ab7
add error handling of broken pipe error and timestamp format
Oct 20, 2023
5ddcdf4
cold start time optimization
Oct 24, 2023
d25fd2e
upload source code zip files concurrently
Oct 24, 2023
18c5bfd
optimize cold start time
Oct 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ To install the local development environment with all necessary packages, please
script. The script takes one optional argument `--venv` with a path to the Python and Node.js virtual
environment, and the default path is `python-venv`. Use `source {venv-path}/bin/activate` to use it.

The deployment with `serverless` framework is wrapped with a helper executable `fk.py`.
The deployment with `serverless` framework is wrapped with a helper executable `fk.py`. To enable verbose debugging of functions, set the flag `verbose` in the config.
### AWS
Use the JSON config example in `config/user_config.json` to change the deployment name and parameters.
Use this to deploy the service with all functions, storage, and queue services:

Expand All @@ -62,9 +63,41 @@ The existing deployment can be cleared by removing entire service before redeplo
```
./fk.py deploy service --provider aws --clean --config config/user_config.json
```
### GCP
Use the JSON config example in `config/user_config_gcp.json` to change the deployment name and parameters.

To enable verbose debugging of functions, set the flag `verbose` in the config.
Before running the script, please create a project and do the follwing, the steps 1-5 are in GCP console, the rest are in local terminal:

1. enabling the datastore mode, deployment manager, pubsub, cloud functions and cloud run. To enable cloud functions and cloud run, you need to click the create button in console.
2. By enabling cloud run service, a service account in the format of XXXXX-compute@developer.gserviceaccount.com will be automatically created.
3. Go to Service Accounts under IAM & Admin and get a key pair of the XXXXX-compute@developer.gserviceaccount.com by clicking the three dots and selecting the manage key. Put the key file into the config folder, and execute the following cmd to store gcp credentials in your local environment.
```
export GOOGLE_APPLICATION_CREDENTIALS="<ABSOLUTE_PATH_TO_KEY>"
```
4. fill the project details in `config/user_config_gcp.json`.
5. Goto GCP IAM, grant XXXXX-compute@developer.gserviceaccount.com the role of owner and Storage Admin; grant GOOGLE ACCOUNT the Service Account Token Creator role.
6. In terminal, login the gcloud cmd tool.
```
gcloud auth login <GOOGLE_ACCOUNT_EMAIL>
```
7. set current project
```
gcloud config set project <PROJECT_ID>
```
8. generate a temp token (valid for 1 hour), and copy the token into `gcp_config_auth.yml`.
```
gcloud auth print-access-token --impersonate-service-account=XXXXX-compute@developer.gserviceaccount.com
```
9. If the token expires, generate a new one and paste the token into `gcp_config_auth.yml` and delete the type-provider datastore-final. The new datastore-final type-provider will then be generated the next time you run deploy.
```
gcloud beta deployment-manager type-providers delete datastore-final
```
10. Uncomment the dependencies under gcp.
11. Use this to deploy the service with all functions, storage, and queue services. If you ever come across an error about permissions, be sure to verify that step 1 has been executed correctly first.

```
./fk.py deploy service config/user_config_gcp_final.json --provider gcp --config config/user_config_gcp.json
```
## Using CLI

A CLI for FaaSKeeper is available in `bin/fkCli.py`. It allows to run interactive FaaSKeeper session,
Expand Down
38 changes: 0 additions & 38 deletions config/gcp.yml

This file was deleted.

2 changes: 1 addition & 1 deletion config/providers.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"plugins": [ "serverless-azure-functions" ]
},
"gcp": {
"region": "eastus",
"region": "us-central1",
"runtime": "python3.8",
"plugins": [
"serverless-google-cloudfunctions"
Expand Down
24 changes: 24 additions & 0 deletions config/user_config_gcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"verbose": "true",
"cloud-provider": "gcp",
"port": 5000,
"deployment-name": "dev",
"deployment-region": "us-central1",
"user-storage": "persistent",
"system-storage": "key-value",
"heartbeat-frequency": 12,
"worker-queue": "pubsub",
"distributor-queue": "pubsub",
"client-channel": "tcp",
"configuration": {
"benchmarking": "True",
"benchmarking-frequency": 1
},
"gcp": {
"project-id": "wide-axiom-402003",
"project-credentials": "/home/ubuntu/summer/workingDirectoryMyFork/faaskeeper/config/wide-axiom-402003-5d5c18960a5e.json",
"default-compute-service-account": "436222241187-compute@developer.gserviceaccount.com",
"database-name": "test2",
"bucket-name": "jncxb1213eqweqqweq11"
}
}
63 changes: 60 additions & 3 deletions fk.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import click

from functions.aws.init import init as aws_init, clean as aws_clean, config as aws_config
from functions.gcp.init import init as gcp_init
from concurrent.futures import Future, ThreadPoolExecutor

def get_env(config_json: dict) -> dict:

Expand Down Expand Up @@ -84,6 +86,7 @@ def export(ctx, provider: str, config):
config_json = json.load(config)

service_name = config_json["deployment-name"]
env = get_env(config_json)
try:
logging.info(
f"Exporting env variables for service {service_name} at provider: {provider}"
Expand All @@ -95,6 +98,16 @@ def export(ctx, provider: str, config):
logging.error("Export env didn't succeed!")
logging.error(e)

def upload_cloud_func(func_name: str, function_names: list, bucket_name: str, service_name: str):
exclude_funcs = ""
for f in function_names:
if f != func_name:
exclude_funcs += f"functions/gcp/{f}.py "
execute(f"zip -r faaskeeper-subs-{func_name}.zip requirements.txt functions/gcp/ -x {exclude_funcs} functions/gcp/tests\* **pycache** **pytest_cache**")
execute(f"printf '@ functions/gcp/{func_name}.py\n@=main.py\n' | zipnote -w faaskeeper-subs-{func_name}.zip", shell=True)
execute(f"gcloud storage cp faaskeeper-subs-{func_name}.zip gs://sls-gcp-{service_name}-{bucket_name}")
execute(f"rm -f faaskeeper-subs-{func_name}.zip")

@deploy.command()
@click.argument("output_config")
@common_params
Expand All @@ -121,15 +134,60 @@ def service(output_config: str, provider: str, config, clean: bool):
logging.warning(e)

logging.info(f"Deploy service {service_name} to provider: {provider}")
execute(f"sls deploy --stage {service_name} -c {provider}.yml", env=env)
execute(f"sls export-env --stage {service_name} -c {provider}.yml", env=env)

if provider == "aws":
execute(f"sls deploy --stage {service_name} -c {provider}.yml", env=env)
execute(f"sls export-env --stage {service_name} -c {provider}.yml", env=env)
aws_init(f"faaskeeper-{service_name}", config_json["deployment-region"])
final_config = aws_config(config_json)
logging.info(f"Exporting FaaSKeeper config to {output_config}!")
json.dump(final_config, open(output_config, 'w'), indent=2)

elif provider == "gcp":
# envs specifically to gcp
env = {
**env,
"FK_GCP_PROJECT_ID": str(config_json["gcp"]["project-id"]),
"FK_GCP_CREDENTIALS": str(config_json["gcp"]["project-credentials"]),
"FK_COMPUTE_SERVICE_ACCOUNT": str(config_json["gcp"]["default-compute-service-account"]),
"FK_DB_NAME": str(config_json["gcp"]["database-name"]),
"FK_BUCKET_NAME": str(config_json["gcp"]["bucket-name"])
}
res = execute(f"gcloud beta deployment-manager type-providers list --format=json")
existing_providers = json.loads(res)
existing_providers_names = [entity['name'] for entity in existing_providers]
# create the custom type provider.
custom_type_provider_name = "datastore-final"
auth_config_relative = "gcp_config_auth.yml"
if custom_type_provider_name not in existing_providers_names:
execute(f"gcloud beta deployment-manager type-providers create {custom_type_provider_name} --api-options-file={auth_config_relative} --descriptor-url=https://firestore.googleapis.com/$discovery/rest?version=v1")
logging.info(f"Created type_provider [{custom_type_provider_name}].")
# create topics, datastore, user storage and a bucket for function details
logging.info(f"Deploy storages, communications in {provider}.yml to provider: {provider}")
try:
execute(f"sls deploy --stage {service_name} -c {provider}.yml", env=env)
except Exception:
logging.error("Check if it is the database OAuth token expiry issue")

bucket_name = str(config_json["gcp"]["bucket-name"])
futures: list[Future] = []
function_names = ["writer", "distributor", "watch"]
logging.info(f"Upload source code to the bucket sls-gcp-{service_name}-{bucket_name}")
with ThreadPoolExecutor(max_workers=len(function_names)) as executor:
for func in function_names:
futures.append(executor.submit(upload_cloud_func, func, function_names, bucket_name, service_name))

for f in futures:
f.result()
logging.info(f"Deploy functions in {provider}_subscriptions.yml to provider: {provider}")
execute(f"sls deploy --stage {service_name} -c {provider}_subscriptions.yml", env=env)
deployment_name = config_json["deployment-name"]
gcp_init(f"faaskeeper-{deployment_name}", str(config_json["deployment-region"]),
bucket_name, deployment_name, str(config_json["gcp"]["project-id"]), str(config_json["gcp"]["database-name"]))
# final_config = aws_config(config_json)
# logging.info(f"Exporting FaaSKeeper config to {output_config}!")
# json.dump(final_config, open(output_config, 'w'), indent=2)

@deploy.command()
@common_params
@click.option("--function", type=str, default="")
Expand All @@ -151,7 +209,6 @@ def functions(provider: str, config, function: str):
env=env,
)


@cli.group(invoke_without_command=True)
@click.pass_context
def remove(ctx):
Expand Down
1 change: 0 additions & 1 deletion functions/aws/control/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def __init__(self):

@staticmethod
def deserialize(dct: dict):

client = Client()
client.session_id = get_object(dct["session_id"])
client.timestamp = get_object(dct["timestamp"])
Expand Down
1 change: 1 addition & 0 deletions functions/aws/control/dynamo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

class DynamoStorage(Storage):
def __init__(self, table_name: str, key_name: str):
# key_name corresponds to KeySchema in yaml
super().__init__(table_name)
self._dynamodb = boto3.client("dynamodb")
self._type_serializer = TypeSerializer()
Expand Down
2 changes: 1 addition & 1 deletion functions/aws/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def handler(event: dict, context):
# write_event["data"]["B"]
# )

elif "body" in record:
elif "body" in record: # received via SQS
write_event = json.loads(record["body"])
event_type = DistributorEventType(int(write_event["type"]["N"]))
if "data" in record["messageAttributes"]:
Expand Down
Empty file added functions/gcp/__init__.py
Empty file.
7 changes: 7 additions & 0 deletions functions/gcp/cloud_providers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# later we will move the enum to faaskeeper client repo config.py
from enum import IntEnum

class CLOUD_PROVIDER(IntEnum):
AWS = 0
GCP = 1
AZURE = 2
133 changes: 133 additions & 0 deletions functions/gcp/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import logging
from enum import Enum
from os import environ
from typing import Optional

import functions.gcp.control as control
import functions.gcp.model as model
from functions.gcp.control import distributor_queue

class Storage(Enum):
PERSISTENT = 0
KEY_VALUE = 1
REDIS = 2

class QueueType(Enum):
PUBSUB = 0

class ChannelType(Enum):
TCP = 0


class Config:

_instance: Optional["Config"] = None

def __init__(self, with_distributor_queue: bool = True):
self._verbose = bool(environ["VERBOSE"])
self._deployment_name = f"faaskeeper-{environ['DEPLOYMENT_NAME']}"
self._deployment_region = environ["GCP_REGION"] # us-central1

logging_format = "%(asctime)s,%(msecs)d %(levelname)s %(name)s: %(message)s"
logging_date_format = "%H:%M:%S"
logging.basicConfig(
format=logging_format,
datefmt=logging_date_format,
level=logging.INFO if self._verbose else logging.WARNING,
# force=True,
)

# configure user storage handle
self._user_storage_type = {
"persistent": Storage.PERSISTENT,
"key-value": Storage.KEY_VALUE,
}.get(environ["USER_STORAGE"])
self._user_storage: model.UserStorage
if self._user_storage_type == Storage.PERSISTENT:
bucket_name = environ["CLOUD_STORAGE_BUCKET"]
self._user_storage = model.CloudStorageStorage(bucket_name=bucket_name)
else:
raise RuntimeError("Not implemented!")
# configure system storage handle
self._system_storage_type = {"key-value": Storage.KEY_VALUE}.get(
environ["SYSTEM_STORAGE"]
)
if self._system_storage_type == Storage.KEY_VALUE:
self._system_storage = model.DataStoreSystemStateStorage(environ["PROJECT_ID"], f"{self._deployment_name}", environ["DB_NAME"])
else:
raise RuntimeError("Not implemented!")

# configure distributor queue
self._distributor_queue: Optional[distributor_queue.DistributorQueue]
if with_distributor_queue:
self._distributor_queue_type = {
"pubsub": QueueType.PUBSUB,
}.get(environ["DISTRIBUTOR_QUEUE"])
if self._distributor_queue_type == QueueType.PUBSUB:
self._distributor_queue = distributor_queue.DistributorQueuePubSub(
environ["PROJECT_ID"], environ["DISTRIBUTOR_QUEUE_NAME"]
)
else:
raise RuntimeError("Not implemented!")
else:
self._distributor_queue = None

# configure client channel
self._client_channel_type = {
"tcp": ChannelType.TCP,
}.get(environ["CLIENT_CHANNEL"])

self._client_channel: control.ClientChannel
if self._client_channel_type == ChannelType.TCP:
self._client_channel = control.ClientChannelTCP()
else:
raise RuntimeError("Not implemented!")

self._benchmarking = False
if "BENCHMARKING" in environ:
self._benchmarking = environ["BENCHMARKING"].lower() == "true"
self._benchmarking_frequency = 1
if "BENCHMARKING_FREQUENCY" in environ:
self._benchmarking_frequency = int(environ["BENCHMARKING_FREQUENCY"])

@staticmethod
def instance(with_distributor_queue: bool = True) -> "Config":
if not Config._instance:
Config._instance = Config(with_distributor_queue)
return Config._instance

@property
def verbose(self) -> bool:
return self._verbose

@property
def deployment_name(self) -> str:
return self._deployment_name

@property
def deployment_region(self) -> str:
return self._deployment_region

@property
def user_storage(self) -> model.UserStorage:
return self._user_storage

@property
def system_storage(self) -> model.SystemStorage:
return self._system_storage

@property
def distributor_queue(self) -> Optional[distributor_queue.DistributorQueue]:
return self._distributor_queue

@property
def client_channel(self) -> control.ClientChannel:
return self._client_channel

@property
def benchmarking(self) -> bool:
return self._benchmarking

@property
def benchmarking_frequency(self) -> int:
return self._benchmarking_frequency
1 change: 1 addition & 0 deletions functions/gcp/control/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .channel import ClientChannel, ClientChannelTCP # noqa
Loading