Skip to content
Open
9 changes: 9 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
"console": "integratedTerminal",
"args": ["--config-path", "./example/pipeline_fruit_batch_ADLS.json", "--working-dir", "./tmp", "--show-result", "--cleanup-database"],
"justMyCode": true
},
{
"name": "Python: main.py ADF",
"type": "python",
"request": "launch",
"program": "src/main.py",
"console": "integratedTerminal",
"args": ["--config-path", "./example/pipeline_fruit_batch_adf.json", "--working-dir", "./tmp", "--show-result", "--cleanup-database"],
"justMyCode": true
},
{
"name": "Python: main.py parallel",
Expand Down
74 changes: 74 additions & 0 deletions doc/SYNAPSE_INTEGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Azure Synapse Analytics Integration
## Overview

It is possible to create and deploy the CDDP pipeline to Azure Synapse Analytics and run the pipeline with Spark Pools.

> "Azure Synapse Analytics is a limitless analytics service that brings together data integration, enterprise data warehousing, and big data analytics. It gives you the freedom to query data on your terms, using either serverless or dedicated options—at scale. "

Some Features of Azure Synapse Analytics including:
- Perform data ingestion, exploration, analytics in one unified environment
- Deeply integrated Apache Spark and SQL engines
- Support multiple languages, including T-SQL, KQL, Python, Scala, Spark SQL, and .Net
- Built-in ability to explore data lake
- ...

Find more information about Azure Synapse Analytics from [here](https://azure.microsoft.com/en-us/products/synapse-analytics/#overview).


## Prerequisite

1. Create a Synapse workspace from Azure portal.
2. Go to the resouce page of workspace, create a new Apache Spark Pool.
3. Go to the resouce page of the Spark pool, under **packages**, install cddp package by uploading a *requirements.txt* which includes package name `cddp`.
![syn1.png](../images/syn1.png)
4. If you would like to test a specific version of cddp or with a local built wheel,
- Open Synapse Studio from workspace
- Upload the wheel file to workspace
![syn2.png](../images/syn2.png)
- Go back to pool page and add the package to pool by **Select from workspace packages**
5. Go back to the resouce page of the Spark pool, and add following Spark configurations.
![syn3.png](../images/syn3.png)
```
{
"spark.cddp.synapse.storageAccountName": "[storage account name]",
"spark.cddp.synapse.fileSystemName": "[file system name]",
"spark.cddp.synapse.linkedService": "[linked service name]"
}
```
You can use the settings when creating the Synapse workspace (and check them in the Synapse Studio), or you can also use newly added linked service of a storage account to the workspace.
6. Link an Azure Key Vault to the workspace to manage secrets.
![syn7.png](../images/syn7.png)
To specify the income sources for datasets, you may need to provide connection strings, certificates, and secrets. Synapse manages secrets through Azure Key Vaults that are linked to the workspace.

For example, suppose you want to specify a connection string to a storage account where fruit prices data is stored. In that case, you can use Azure Key Vault to save this connection string and then specify the secret name "fruitsaaccesskey" in the storage-account-access-key field and the linked service name "kv_cddp_siliang_1" in the secret_scope field.
![syn6.png](../images/syn6.png)


## Create a Spark Job Definition Manually

1. Upload the main definition file to the linked storage account above (you can use **src/main.py** as the main definition file).
```
import cddp
import cddp.dbxapi as dbxapi

# disable informational messages from prophet
import logging
logging.getLogger('py4j').setLevel(logging.ERROR)

if __name__ == "__main__":
cddp.entrypoint()
```
2. Upload sample data and pipeline configuation file to the linked storage account (**/example/\*\***).
3. Open Synapse Studio, go to **develop**, and add a new Spark Job Definition.
4. Fill in the main definiation file path and command line arguments with the `abfss://` path of the main.py you uploaded in **step 1**.
![syn4.png](../images/syn4.png)
```
main definition file path: "abfss://[file system name]@[storage account name].dfs.core.windows.net/main.py"
command line arguments: "--config-path abfss://[file system name]@[storage account name].dfs.core.windows.net/example/pipeline_fruit_batch_ADLS.json --stage staging --task price_ingestion --working-dir ./tmp --show-result --build-landing-zone --cleanup-database"
```
Quickly find the `abfss://` path of your file using **data** tab in the Synapse Studio. Go to **data** --> **linked** --> **your storage account** --> find your file --> click **More** of top bar --> **Properties** --> copy the `abfss://` path.
5. Submit the job definition to start a run.
6. Publish the job.
6. You can also Create a pipeline to run multiple jobs.
![syn5.png](../images/syn5.png)

116 changes: 116 additions & 0 deletions example/pipeline_fruit_batch_adf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{
"name": "fruit_batch_data_app",
"staging": [
{
"name": "sales_ingestion",
"target": "stg_sales",
"input": {
"type": "azure_adf",
"format": "csv",
"path": "FileStore/cddp_apps/fruit_batch_data_app/landing/sales_ingestion/",
"read-type": "batch",
"schema": {
"fields": [
{
"metadata": {},
"name": "ID",
"nullable": true,
"type": "integer"
},
{
"metadata": {},
"name": "Amount",
"nullable": true,
"type": "integer"
},
{
"metadata": {},
"name": "TS",
"nullable": true,
"type": "timestamp"
}
],
"type": "struct"
},
"secret_scope": "lsgckv",
"storage_account": "lsgcdls",
"storage_account_access_key": "sp-storage-account-access-key",
"application_id": "sp-application-id",
"service_credential_key": "sp-service-credential",
"subscription_id":"sp-subscription-id",
"directory_id": "sp-directory-id",
"synapse_linkservice_name": "lsgcsyws-WorkspaceDefaultStorage",
"rg_name": "lsgc",
"df_name": "lsgcadf",
"p_name": "Data Landing Pipeline",
"container_name": "test",
"data_folder": "fruit-sales",
"token": "mockdata/2022-01-10.csv?sp=r&st=2023-03-23T07:54:45Z&se=2024-04-17T15:54:45Z&spr=https&sv=2021-12-02&sr=b&sig=P%2Fr124QGaovT3qrhuA7XCLnwxtg40W1UHG5Oo9jTJ5U%3D"
},
"output": {
"target": "stg_sales",
"type": [
"file",
"view"
]
},
"schema": {
"fields": [
{
"metadata": {},
"name": "ID",
"nullable": true,
"type": "integer"
},
{
"metadata": {},
"name": "Amount",
"nullable": true,
"type": "integer"
},
{
"metadata": {},
"name": "TS",
"nullable": true,
"type": "timestamp"
}
],
"type": "struct"
}
}
],
"standard": [
{
"name": "fruit_sales_transform",
"type": "batch",
"code": {
"lang": "sql",
"sql": "select price.fruit, price.id, sales.amount, price.price, sales.ts from stg_sales sales left outer join stg_price price on sales.id = price.id and sales.ts >= price.start_ts and sales.ts < price.end_ts"
},
"output": {
"target": "std_fruit_sales",
"type": [
"file",
"view"
]
}
}
],
"serving": [
{
"name": "fruit_sales_total_curation",
"type": "batch",
"code": {
"lang": "sql",
"sql": "select id, fruit, sum(amount*price) as total from std_fruit_sales group by id, fruit order by total desc"
},
"output": {
"target": "srv_fruit_sales_total",
"type": [
"table",
"file"
]
}
}
]
}
Binary file added images/syn1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/syn2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/syn3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/syn4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/syn5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/syn6.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/syn7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ sqlalchemy
python-dotenv
jupyter
flask
databricks-cli
databricks-cli
azure-mgmt-resource
azure-mgmt-datafactory
azure-identity
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="cddp",
version="0.0.9",
version="0.1.9",
author="Sean Ma",
author_email="maye-msft@outlook.com",
description="Config Driven Data Pipeline",
Expand Down
44 changes: 42 additions & 2 deletions src/cddp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,36 @@ def create_spark_session():
spark = configure_spark_with_delta_pip(builder).getOrCreate()
return spark

def setup_synapse(spark, config_path):
from notebookutils import mssparkutils
job_id = mssparkutils.env.getJobId()

storage_account_name = spark.sparkContext.getConf().get("spark.cddp.synapse.storageAccountName")
file_system_name = spark.sparkContext.getConf().get("spark.cddp.synapse.fileSystemName")
linked_service = spark.sparkContext.getConf().get("spark.cddp.synapse.linkedService")

import time
current_time = int(round(time.time()))
mnt_key = f"{current_time}_{job_id}"

print(f"[synapse] creating mount folder for spark job /cddp/mnt/{mnt_key}")

mssparkutils.fs.mkdirs(f"/cddp/mnt/{mnt_key}")
mssparkutils.fs.mount(
f"abfss://{file_system_name}@{storage_account_name}.dfs.core.windows.net/cddp/mnt/{mnt_key}",
f"/cddp/mnt/{mnt_key}",
{
"linkedService": linked_service
}
)

config_file_name = config_path.split("/")[-1]
mssparkutils.fs.cp(config_path, f"/cddp/mnt/{mnt_key}/configs/{config_file_name}")
config_path = f"/synfs/{job_id}/cddp/mnt/{mnt_key}/configs/{config_file_name}"
print(f"[synapse] copied config file to {config_path}")

return job_id, config_path


def init(spark, config, working_dir):
"""Delete the folders for the data storage"""
Expand All @@ -52,7 +82,11 @@ def init(spark, config, working_dir):
def init_database(spark, config):
app_name = config['name']
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {app_name}")
spark.sql(f"USE SCHEMA {app_name}")

if utils.is_running_on_synapse(spark):
spark.sql(f"USE {app_name}")
else:
spark.sql(f"USE SCHEMA {app_name}")


def clean_database(spark, config):
Expand Down Expand Up @@ -209,7 +243,10 @@ def get_dataset_as_json(spark, config, stage, task, limit=20):
task_output = task["output"]["type"]
target = task["output"]["target"]
app_name = config["name"]
spark.sql(f"USE SCHEMA {app_name}")
if utils.is_running_on_synapse(spark):
spark.sql(f"USE {app_name}")
else:
spark.sql(f"USE SCHEMA {app_name}")
if "view" in task_output:
df = spark.sql("select * from "+target+" limit "+str(limit))

Expand Down Expand Up @@ -272,6 +309,9 @@ def entrypoint():
if 'spark' not in globals():
spark = create_spark_session()

if utils.is_running_on_synapse(spark):
_, config_path = setup_synapse(spark, config_path)

run_pipeline(spark, config_path, working_dir, stage_arg, task_arg, show_result, build_landing_zone, awaitTermination, cleanup_database)


Expand Down
9 changes: 8 additions & 1 deletion src/cddp/ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import cddp.ingestion.autoloader
import cddp.ingestion.azure_eventhub
import cddp.ingestion.azure_adls_gen2
import cddp.ingestion.azure_adls_gen1
import cddp.ingestion.azure_adls_gen2
import cddp.ingestion.azure_adls_gen2_syn
import cddp.ingestion.azure_adls_adf_syn
import cddp.ingestion.filestore
import cddp.ingestion.deltalake

import cddp.utils as utils

def start_ingestion_task(task, spark):
type = task['input']['type']
Expand All @@ -15,11 +17,16 @@ def start_ingestion_task(task, spark):
return azure_eventhub.start_ingestion_task(task, spark)
elif type == 'jdbc':
return jdbc.start_ingestion_task(task, spark)
elif type == 'azure_adf':
return azure_adls_adf_syn.start_ingestion_task(task, spark)
elif type == 'deltalake':
return deltalake.start_ingestion_task(task, spark)
elif type == 'filestore':
return filestore.start_ingestion_task(task, spark)
elif type == 'azure_adls_gen2':
if utils.is_running_on_synapse(spark):
return azure_adls_gen2_syn.start_ingestion_task(task, spark)

return azure_adls_gen2.start_ingestion_task(task, spark)
elif type == 'azure_adls_gen1':
return azure_adls_gen1.start_ingestion_task(task, spark)
Expand Down
Loading