diff --git a/.vscode/launch.json b/.vscode/launch.json index dd543f9..65a794e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -30,6 +30,15 @@ "console": "integratedTerminal", "args": ["--config-path", "./example/pipeline_fruit_batch_ADLS.json", "--working-dir", "./tmp", "--show-result", "--cleanup-database"], "justMyCode": true + }, + { + "name": "Python: main.py ADF", + "type": "python", + "request": "launch", + "program": "src/main.py", + "console": "integratedTerminal", + "args": ["--config-path", "./example/pipeline_fruit_batch_adf.json", "--working-dir", "./tmp", "--show-result", "--cleanup-database"], + "justMyCode": true }, { "name": "Python: main.py parallel", diff --git a/doc/SYNAPSE_INTEGRATION.md b/doc/SYNAPSE_INTEGRATION.md new file mode 100644 index 0000000..4970e10 --- /dev/null +++ b/doc/SYNAPSE_INTEGRATION.md @@ -0,0 +1,74 @@ +# Azure Synapse Analytics Integration +## Overview + +It is possible to create and deploy the CDDP pipeline to Azure Synapse Analytics and run the pipeline with Spark Pools. + +> "Azure Synapse Analytics is a limitless analytics service that brings together data integration, enterprise data warehousing, and big data analytics. It gives you the freedom to query data on your terms, using either serverless or dedicated options—at scale. " + +Some Features of Azure Synapse Analytics including: +- Perform data ingestion, exploration, analytics in one unified environment +- Deeply integrated Apache Spark and SQL engines +- Support multiple languages, including T-SQL, KQL, Python, Scala, Spark SQL, and .Net +- Built-in ability to explore data lake +- ... + +Find more information about Azure Synapse Analytics from [here](https://azure.microsoft.com/en-us/products/synapse-analytics/#overview). + + +## Prerequisite + +1. Create a Synapse workspace from Azure portal. +2. Go to the resouce page of workspace, create a new Apache Spark Pool. +3. Go to the resouce page of the Spark pool, under **packages**, install cddp package by uploading a *requirements.txt* which includes package name `cddp`. +![syn1.png](../images/syn1.png) +4. If you would like to test a specific version of cddp or with a local built wheel, + - Open Synapse Studio from workspace + - Upload the wheel file to workspace + ![syn2.png](../images/syn2.png) + - Go back to pool page and add the package to pool by **Select from workspace packages** +5. Go back to the resouce page of the Spark pool, and add following Spark configurations. +![syn3.png](../images/syn3.png) +``` +{ + "spark.cddp.synapse.storageAccountName": "[storage account name]", + "spark.cddp.synapse.fileSystemName": "[file system name]", + "spark.cddp.synapse.linkedService": "[linked service name]" +} +``` +You can use the settings when creating the Synapse workspace (and check them in the Synapse Studio), or you can also use newly added linked service of a storage account to the workspace. +6. Link an Azure Key Vault to the workspace to manage secrets. +![syn7.png](../images/syn7.png) +To specify the income sources for datasets, you may need to provide connection strings, certificates, and secrets. Synapse manages secrets through Azure Key Vaults that are linked to the workspace. + +For example, suppose you want to specify a connection string to a storage account where fruit prices data is stored. In that case, you can use Azure Key Vault to save this connection string and then specify the secret name "fruitsaaccesskey" in the storage-account-access-key field and the linked service name "kv_cddp_siliang_1" in the secret_scope field. +![syn6.png](../images/syn6.png) + + +## Create a Spark Job Definition Manually + +1. Upload the main definition file to the linked storage account above (you can use **src/main.py** as the main definition file). +``` +import cddp +import cddp.dbxapi as dbxapi + +# disable informational messages from prophet +import logging +logging.getLogger('py4j').setLevel(logging.ERROR) + +if __name__ == "__main__": + cddp.entrypoint() +``` +2. Upload sample data and pipeline configuation file to the linked storage account (**/example/\*\***). +3. Open Synapse Studio, go to **develop**, and add a new Spark Job Definition. +4. Fill in the main definiation file path and command line arguments with the `abfss://` path of the main.py you uploaded in **step 1**. +![syn4.png](../images/syn4.png) +``` +main definition file path: "abfss://[file system name]@[storage account name].dfs.core.windows.net/main.py" +command line arguments: "--config-path abfss://[file system name]@[storage account name].dfs.core.windows.net/example/pipeline_fruit_batch_ADLS.json --stage staging --task price_ingestion --working-dir ./tmp --show-result --build-landing-zone --cleanup-database" +``` +Quickly find the `abfss://` path of your file using **data** tab in the Synapse Studio. Go to **data** --> **linked** --> **your storage account** --> find your file --> click **More** of top bar --> **Properties** --> copy the `abfss://` path. +5. Submit the job definition to start a run. +6. Publish the job. +6. You can also Create a pipeline to run multiple jobs. +![syn5.png](../images/syn5.png) + diff --git a/example/pipeline_fruit_batch_adf.json b/example/pipeline_fruit_batch_adf.json new file mode 100644 index 0000000..62d8c08 --- /dev/null +++ b/example/pipeline_fruit_batch_adf.json @@ -0,0 +1,116 @@ +{ + "name": "fruit_batch_data_app", + "staging": [ + { + "name": "sales_ingestion", + "target": "stg_sales", + "input": { + "type": "azure_adf", + "format": "csv", + "path": "FileStore/cddp_apps/fruit_batch_data_app/landing/sales_ingestion/", + "read-type": "batch", + "schema": { + "fields": [ + { + "metadata": {}, + "name": "ID", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "Amount", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "TS", + "nullable": true, + "type": "timestamp" + } + ], + "type": "struct" + }, + "secret_scope": "lsgckv", + "storage_account": "lsgcdls", + "storage_account_access_key": "sp-storage-account-access-key", + "application_id": "sp-application-id", + "service_credential_key": "sp-service-credential", + "subscription_id":"sp-subscription-id", + "directory_id": "sp-directory-id", + "synapse_linkservice_name": "lsgcsyws-WorkspaceDefaultStorage", + "rg_name": "lsgc", + "df_name": "lsgcadf", + "p_name": "Data Landing Pipeline", + "container_name": "test", + "data_folder": "fruit-sales", + "token": "mockdata/2022-01-10.csv?sp=r&st=2023-03-23T07:54:45Z&se=2024-04-17T15:54:45Z&spr=https&sv=2021-12-02&sr=b&sig=P%2Fr124QGaovT3qrhuA7XCLnwxtg40W1UHG5Oo9jTJ5U%3D" + }, + "output": { + "target": "stg_sales", + "type": [ + "file", + "view" + ] + }, + "schema": { + "fields": [ + { + "metadata": {}, + "name": "ID", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "Amount", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "TS", + "nullable": true, + "type": "timestamp" + } + ], + "type": "struct" + } + } + ], + "standard": [ + { + "name": "fruit_sales_transform", + "type": "batch", + "code": { + "lang": "sql", + "sql": "select price.fruit, price.id, sales.amount, price.price, sales.ts from stg_sales sales left outer join stg_price price on sales.id = price.id and sales.ts >= price.start_ts and sales.ts < price.end_ts" + }, + "output": { + "target": "std_fruit_sales", + "type": [ + "file", + "view" + ] + } + } + ], + "serving": [ + { + "name": "fruit_sales_total_curation", + "type": "batch", + "code": { + "lang": "sql", + "sql": "select id, fruit, sum(amount*price) as total from std_fruit_sales group by id, fruit order by total desc" + }, + "output": { + "target": "srv_fruit_sales_total", + "type": [ + "table", + "file" + ] + } + } + ] + } \ No newline at end of file diff --git a/images/syn1.png b/images/syn1.png new file mode 100644 index 0000000..c3c7950 Binary files /dev/null and b/images/syn1.png differ diff --git a/images/syn2.png b/images/syn2.png new file mode 100644 index 0000000..e63422a Binary files /dev/null and b/images/syn2.png differ diff --git a/images/syn3.png b/images/syn3.png new file mode 100644 index 0000000..1abca35 Binary files /dev/null and b/images/syn3.png differ diff --git a/images/syn4.png b/images/syn4.png new file mode 100644 index 0000000..580e76f Binary files /dev/null and b/images/syn4.png differ diff --git a/images/syn5.png b/images/syn5.png new file mode 100644 index 0000000..31b2637 Binary files /dev/null and b/images/syn5.png differ diff --git a/images/syn6.png b/images/syn6.png new file mode 100644 index 0000000..4ae2eea Binary files /dev/null and b/images/syn6.png differ diff --git a/images/syn7.png b/images/syn7.png new file mode 100644 index 0000000..ea8ec5b Binary files /dev/null and b/images/syn7.png differ diff --git a/requirements.txt b/requirements.txt index 199a493..67e533a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,7 @@ sqlalchemy python-dotenv jupyter flask -databricks-cli \ No newline at end of file +databricks-cli +azure-mgmt-resource +azure-mgmt-datafactory +azure-identity \ No newline at end of file diff --git a/setup.py b/setup.py index e453907..39927f5 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="cddp", - version="0.0.9", + version="0.1.9", author="Sean Ma", author_email="maye-msft@outlook.com", description="Config Driven Data Pipeline", diff --git a/src/cddp/__init__.py b/src/cddp/__init__.py index 3823432..4525560 100644 --- a/src/cddp/__init__.py +++ b/src/cddp/__init__.py @@ -27,6 +27,36 @@ def create_spark_session(): spark = configure_spark_with_delta_pip(builder).getOrCreate() return spark +def setup_synapse(spark, config_path): + from notebookutils import mssparkutils + job_id = mssparkutils.env.getJobId() + + storage_account_name = spark.sparkContext.getConf().get("spark.cddp.synapse.storageAccountName") + file_system_name = spark.sparkContext.getConf().get("spark.cddp.synapse.fileSystemName") + linked_service = spark.sparkContext.getConf().get("spark.cddp.synapse.linkedService") + + import time + current_time = int(round(time.time())) + mnt_key = f"{current_time}_{job_id}" + + print(f"[synapse] creating mount folder for spark job /cddp/mnt/{mnt_key}") + + mssparkutils.fs.mkdirs(f"/cddp/mnt/{mnt_key}") + mssparkutils.fs.mount( + f"abfss://{file_system_name}@{storage_account_name}.dfs.core.windows.net/cddp/mnt/{mnt_key}", + f"/cddp/mnt/{mnt_key}", + { + "linkedService": linked_service + } + ) + + config_file_name = config_path.split("/")[-1] + mssparkutils.fs.cp(config_path, f"/cddp/mnt/{mnt_key}/configs/{config_file_name}") + config_path = f"/synfs/{job_id}/cddp/mnt/{mnt_key}/configs/{config_file_name}" + print(f"[synapse] copied config file to {config_path}") + + return job_id, config_path + def init(spark, config, working_dir): """Delete the folders for the data storage""" @@ -52,7 +82,11 @@ def init(spark, config, working_dir): def init_database(spark, config): app_name = config['name'] spark.sql(f"CREATE SCHEMA IF NOT EXISTS {app_name}") - spark.sql(f"USE SCHEMA {app_name}") + + if utils.is_running_on_synapse(spark): + spark.sql(f"USE {app_name}") + else: + spark.sql(f"USE SCHEMA {app_name}") def clean_database(spark, config): @@ -209,7 +243,10 @@ def get_dataset_as_json(spark, config, stage, task, limit=20): task_output = task["output"]["type"] target = task["output"]["target"] app_name = config["name"] - spark.sql(f"USE SCHEMA {app_name}") + if utils.is_running_on_synapse(spark): + spark.sql(f"USE {app_name}") + else: + spark.sql(f"USE SCHEMA {app_name}") if "view" in task_output: df = spark.sql("select * from "+target+" limit "+str(limit)) @@ -272,6 +309,9 @@ def entrypoint(): if 'spark' not in globals(): spark = create_spark_session() + if utils.is_running_on_synapse(spark): + _, config_path = setup_synapse(spark, config_path) + run_pipeline(spark, config_path, working_dir, stage_arg, task_arg, show_result, build_landing_zone, awaitTermination, cleanup_database) diff --git a/src/cddp/ingestion/__init__.py b/src/cddp/ingestion/__init__.py index 4ad8d38..0b75c6e 100644 --- a/src/cddp/ingestion/__init__.py +++ b/src/cddp/ingestion/__init__.py @@ -1,11 +1,13 @@ import cddp.ingestion.autoloader import cddp.ingestion.azure_eventhub -import cddp.ingestion.azure_adls_gen2 import cddp.ingestion.azure_adls_gen1 import cddp.ingestion.azure_adls_gen2 +import cddp.ingestion.azure_adls_gen2_syn +import cddp.ingestion.azure_adls_adf_syn import cddp.ingestion.filestore import cddp.ingestion.deltalake +import cddp.utils as utils def start_ingestion_task(task, spark): type = task['input']['type'] @@ -15,11 +17,16 @@ def start_ingestion_task(task, spark): return azure_eventhub.start_ingestion_task(task, spark) elif type == 'jdbc': return jdbc.start_ingestion_task(task, spark) + elif type == 'azure_adf': + return azure_adls_adf_syn.start_ingestion_task(task, spark) elif type == 'deltalake': return deltalake.start_ingestion_task(task, spark) elif type == 'filestore': return filestore.start_ingestion_task(task, spark) elif type == 'azure_adls_gen2': + if utils.is_running_on_synapse(spark): + return azure_adls_gen2_syn.start_ingestion_task(task, spark) + return azure_adls_gen2.start_ingestion_task(task, spark) elif type == 'azure_adls_gen1': return azure_adls_gen1.start_ingestion_task(task, spark) diff --git a/src/cddp/ingestion/azure_adls_adf_syn.py b/src/cddp/ingestion/azure_adls_adf_syn.py new file mode 100644 index 0000000..aae9b3e --- /dev/null +++ b/src/cddp/ingestion/azure_adls_adf_syn.py @@ -0,0 +1,106 @@ +from pyspark.sql.types import * + +from datetime import datetime, timedelta +import time +import datetime + +from azure.identity import ClientSecretCredential +from azure.mgmt.resource import ResourceManagementClient +from azure.mgmt.datafactory import DataFactoryManagementClient +from azure.mgmt.datafactory.models import * + +def start_ingestion_task(task, spark): + from notebookutils import mssparkutils + + now = datetime.datetime.now() + time_str = now.strftime("%Y-%m-%d %H:%M:%S") + + ContainerName = task['input']["container_name"] + FilePath = task['input']["data_folder"] + FileName = time_str + "."+ task['input']["format"] + + client_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["application_id"]) + tenant_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["directory_id"]) + client_secret = mssparkutils.credentials.getSecret(task['input']["secret_scope"],task['input']["service_credential_key"]) + # Azure subscription ID + subscription_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"],task['input']["subscription_id"]) + + # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group + rg_name = task['input']["rg_name"] + # The data factory name. It must be globally unique. + df_name = task['input']["df_name"] + # The ADF pipeline name + p_name = task['input']["p_name"] + + token = task['input']["token"] + params = { + "ContainerName": ContainerName, + "FilePath": FilePath, + "FileName": FileName, + "token": token, + } + + # Specify your Active Directory client ID, client secret, and tenant ID + credentials = ClientSecretCredential( + client_id=client_id, client_secret=client_secret, tenant_id=tenant_id + ) + adf_client = DataFactoryManagementClient(credentials, subscription_id) + # Create a pipeline run + run_response = adf_client.pipelines.create_run( + rg_name, df_name, p_name, parameters=params + ) + + # Monitor the pipeline run + pipeline_run_status = "" + until_status = ["Succeeded", "TimedOut", "Failed", "Cancelled"] + while pipeline_run_status not in until_status: + pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id) + pipeline_run_status = pipeline_run.status + print("\n\tPipeline run status: {}".format(pipeline_run.status)) + time.sleep(5) + + schema = StructType.fromJson(task["schema"]) + storage_account = task['input']["storage_account"] + + if "synapse_linkservice_name" in task['input']: + spark.conf.set("spark.storage.synapse.linkedServiceName", task['input']["synapse_linkservice_name"]) + spark.conf.set("fs.azure.account.oauth.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") + path = f"abfss://{ContainerName}@{storage_account}.blob.core.windows.net/{FilePath}/{FileName}" + df = spark.read.format(task['input']["format"]) \ + .option("header", "true") \ + .schema(schema) \ + .load(path) + return df, False + + elif "sas-token" in task['input']: + sas_token = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["storage_account"]) + spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.blob.core.windows.net", "SAS") + spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.blob.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") + spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.blob.core.windows.net", sas_token) + + elif "storage_account_access_key" in task['input']: + storage_account_access_key = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["storage_account_access_key"]) + spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", storage_account_access_key) + + elif "service_credential_key" in task['input']: + application_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["application_id"]) + directory_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["directory_id"]) + service_credential = mssparkutils.credentials.getSecret(task['input']["secret_scope"],task['input']["service_credential_key"]) + + spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.blob.core.windows.net", "OAuth") + spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.blob.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") + spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.blob.core.windows.net", application_id) + spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.blob.core.windows.net", service_credential) + spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.blob.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token") + + path = f"wasbs://{ContainerName}@{storage_account}.blob.core.windows.net/{FilePath}/{FileName}" + + # read data of parquet, JSON, CSV, Text + df = spark.read.format(task['input']["format"]) \ + .option("header", "true") \ + .schema(schema) \ + .load(path) + + return df, False + + diff --git a/src/cddp/ingestion/azure_adls_gen2_syn.py b/src/cddp/ingestion/azure_adls_gen2_syn.py new file mode 100644 index 0000000..92e66fa --- /dev/null +++ b/src/cddp/ingestion/azure_adls_gen2_syn.py @@ -0,0 +1,41 @@ +from pyspark.sql.types import * + +def start_ingestion_task(task, spark): + from notebookutils import mssparkutils + + schema = StructType.fromJson(task["schema"]) + storage_account = task['input']["storage_account"] + if "sas-token" in task: + sas_token = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["storage_account"]) + spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS") + spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") + spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token) + + elif "service-credential-key" in task: + + application_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["application_id"]) + directory_id = mssparkutils.credentials.getSecret(task['input']["secret_scope"], task['input']["directory_id"]) + service_credential = mssparkutils.credentials.getSecret(task['input']["secret_scope"],task['input']["service_credential_key"]) + + spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth") + spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") + spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id) + spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential) + spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token") + + elif "storage_account-access-key" in task: + storage_account_access_key = mssparkutils.credentials.getSecret(task['input']["secret_scope"], "storage-account-access-key") + spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", storage_account_access_key) + + container_name = task['input']["container_name"] + path_to_data = task['input']["data_folder"] + path = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/{path_to_data}" + + # read data of parquet, JSON, CSV, Text + df = spark.read.format(task["input"]["format"]) \ + .option("header", "true") \ + .schema(schema) \ + .load(path) + + return df, False + \ No newline at end of file diff --git a/src/cddp/utils.py b/src/cddp/utils.py index 37e4638..4052aae 100644 --- a/src/cddp/utils.py +++ b/src/cddp/utils.py @@ -18,6 +18,8 @@ def json_to_csv(jsondata, output_path): def is_running_on_databricks(): return os.getenv("SPARK_HOME") == "/databricks/spark" +def is_running_on_synapse(spark): + return spark.sparkContext.getConf().get("spark.cluster.type") == "synapse" def get_path_for_current_env(input_type, path): # Remove the '/' in the path to ensure the sucessful CI pipeline run