From 53d42df5c4f3ea9191570d7dc1e323c0f7eac88b Mon Sep 17 00:00:00 2001 From: Bo Wang <36917625+Nick287@users.noreply.github.com> Date: Thu, 8 Dec 2022 06:22:58 +0000 Subject: [PATCH 1/6] fix jdbc connection and add jdbc config file --- .vscode/launch.json | 9 ++ example/pipeline_fruit_batch_JDBC.json | 211 +++++++++++++++++++++++++ src/app.py | 2 +- src/cddp/__init__.py | 17 +- src/cddp/dbxapi.py | 2 + src/cddp/ingestion/__init__.py | 2 +- src/cddp/ingestion/jdbc.py | 24 +-- 7 files changed, 247 insertions(+), 20 deletions(-) create mode 100644 example/pipeline_fruit_batch_JDBC.json diff --git a/.vscode/launch.json b/.vscode/launch.json index bd7e037..e912b02 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,6 +12,15 @@ "console": "integratedTerminal", "args": ["--config-path", "./example/pipeline_fruit.json", "--working-dir", "./tmp", "--show-result", "True", "--build-landing-zone", "True"], "justMyCode": true + }, + { + "name": "Python: main.py JDBC", + "type": "python", + "request": "launch", + "program": "src/main.py", + "console": "integratedTerminal", + "args": ["--config-path", "./example/pipeline_fruit_batch_JDBC.json", "--working-dir", "./tmp", "--show-result", "--cleanup-database"], + "justMyCode": true }, { "name": "Python: main.py parallel", diff --git a/example/pipeline_fruit_batch_JDBC.json b/example/pipeline_fruit_batch_JDBC.json new file mode 100644 index 0000000..82db134 --- /dev/null +++ b/example/pipeline_fruit_batch_JDBC.json @@ -0,0 +1,211 @@ +{ + "name": "fruit_batch_data_app", + "staging": [ + { + "name": "sales_ingestion", + "target": "stg_sales", + "input": { + "type": "jdbc", + "format": "csv", + "path": "FileStore/cddp_apps/fruit_batch_data_app/landing/sales_ingestion/", + "read-type": "batch", + "schema": { + "fields": [ + { + "metadata": {}, + "name": "ID", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "Amount", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "TS", + "nullable": true, + "type": "timestamp" + } + ], + "type": "struct" + }, + "secret_scope": "ADLSTEST", + "table_name": "fruit_sales", + "jdbc_url": "jdbc:sqlserver://jdbctestbo.database.windows.net:1433;database=JDBCTEST", + "jdbc_password": "jdbc-password", + "jdbc_username": "jdbc-username" + }, + "output": { + "target": "stg_sales", + "type": [ + "file", + "view" + ] + }, + "schema": { + "fields": [ + { + "metadata": {}, + "name": "ID", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "Amount", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "TS", + "nullable": true, + "type": "timestamp" + } + ], + "type": "struct" + } + }, + { + "name": "price_ingestion", + "target": "stg_price", + "input": { + "type": "jdbc", + "format": "csv", + "path": "FileStore/cddp_apps/fruit_batch_data_app/landing/price_ingestion/", + "read-type": "batch", + "schema": { + "fields": [ + { + "metadata": {}, + "name": "ID", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "Fruit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "Color", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "Price", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "Start_TS", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "End_TS", + "nullable": true, + "type": "timestamp" + } + ], + "type": "struct" + }, + "secret_scope": "ADLSTEST", + "table_name": "fruit_price", + "jdbc_url": "jdbc:sqlserver://jdbctestbo.database.windows.net:1433;database=JDBCTEST", + "jdbc_password": "jdbc-password", + "jdbc_username": "jdbc-username" + }, + "output": { + "target": "stg_price", + "type": [ + "file", + "view" + ] + }, + "schema": { + "fields": [ + { + "metadata": {}, + "name": "ID", + "nullable": true, + "type": "integer" + }, + { + "metadata": {}, + "name": "Fruit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "Color", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "Price", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "Start_TS", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "End_TS", + "nullable": true, + "type": "timestamp" + } + ], + "type": "struct" + } + } + ], + "standard": [ + { + "name": "fruit_sales_transform", + "type": "batch", + "code": { + "lang": "sql", + "sql": "select price.fruit, price.id, sales.amount, price.price, sales.ts from stg_sales sales left outer join stg_price price on sales.id = price.id and sales.ts >= price.start_ts and sales.ts < price.end_ts" + }, + "output": { + "target": "std_fruit_sales", + "type": [ + "file", + "view" + ] + } + } + ], + "serving": [ + { + "name": "fruit_sales_total_curation", + "type": "batch", + "code": { + "lang": "sql", + "sql": "select id, fruit, sum(amount*price) as total from std_fruit_sales group by id, fruit order by total desc" + }, + "output": { + "target": "srv_fruit_sales_total", + "type": [ + "table", + "file" + ] + } + } + ] +} \ No newline at end of file diff --git a/src/app.py b/src/app.py index 63a51c8..4a4b8cc 100644 --- a/src/app.py +++ b/src/app.py @@ -1,4 +1,4 @@ -from msilib import init_database +# from msilib import init_database import json import tempfile from flask import Flask, request, jsonify diff --git a/src/cddp/__init__.py b/src/cddp/__init__.py index e713b6b..ddc6e7d 100644 --- a/src/cddp/__init__.py +++ b/src/cddp/__init__.py @@ -117,6 +117,7 @@ def start_staging_job(spark, config, task, timeout=None): print(f"Starting staging job for {task['name']}\n{json.dumps(task)}") staging_path = config["staging_path"] df, is_streaming = cddp_ingestion.start_ingestion_task(task, spark) + df.show() output_dataset(spark, task, df, is_streaming, staging_path, "append", timeout) @@ -129,10 +130,12 @@ def start_standard_job(spark, config, task, need_load_views=True, test_mode=Fals load_staging_views(spark, config) df = run_task_code(spark, task) - if test_mode: - output_dataset(spark, task, df, False, standard_path, "append", timeout) - else: - output_dataset(spark, task, df, task["type"]=="streaming", standard_path, "append", timeout) + + is_streaming = False + if task["type"]=="streaming": + is_streaming = True + + output_dataset(spark, task, df, is_streaming, standard_path, "append", timeout) return df @@ -246,13 +249,13 @@ def entrypoint(): parser.add_argument( '--stage', help='run a task in the specified stage', required=False) parser.add_argument('--task', help='run a specified task', required=False) - parser.add_argument('--show-result', type=bool, default=False, + parser.add_argument('--show-result', action='store_true', help='flag to show task data result', required=False) - parser.add_argument('--build-landing-zone', type=bool, default=False, + parser.add_argument('--build-landing-zone', action='store_true', help='build landing zone and import sample data, it will create folder "FileStore" in root folder', required=False) parser.add_argument('--await-termination', type=int, help='how many seconds to wait before streaming job terminating, no specified means not terminating.', required=False) - parser.add_argument('--cleanup-database', type=bool, default=False, + parser.add_argument('--cleanup-database', action='store_true', help='Clean up existing database', required=False) args = parser.parse_args() diff --git a/src/cddp/dbxapi.py b/src/cddp/dbxapi.py index b20c0bb..94725fa 100644 --- a/src/cddp/dbxapi.py +++ b/src/cddp/dbxapi.py @@ -90,6 +90,8 @@ def build_tasks(config, working_dir, config_path, dbx_cluster): type = task["input"]["type"] if type == "filestore": mode = task["input"]["read-type"] + elif type == "jdbc": + mode = task["input"]["read-type"] name = task["name"] output = task["output"]["type"] if 'table' in output or 'file' in output: diff --git a/src/cddp/ingestion/__init__.py b/src/cddp/ingestion/__init__.py index 4ad8d38..52dda1f 100644 --- a/src/cddp/ingestion/__init__.py +++ b/src/cddp/ingestion/__init__.py @@ -1,6 +1,6 @@ import cddp.ingestion.autoloader import cddp.ingestion.azure_eventhub -import cddp.ingestion.azure_adls_gen2 +import cddp.ingestion.jdbc import cddp.ingestion.azure_adls_gen1 import cddp.ingestion.azure_adls_gen2 import cddp.ingestion.filestore diff --git a/src/cddp/ingestion/jdbc.py b/src/cddp/ingestion/jdbc.py index a5216a7..871cd66 100644 --- a/src/cddp/ingestion/jdbc.py +++ b/src/cddp/ingestion/jdbc.py @@ -1,19 +1,21 @@ from pyspark.sql.types import * +import IPython def start_ingestion_task(task, spark): - import dbutils + # import dbutils + dbutils = IPython.get_ipython().user_ns["dbutils"] schema = StructType.fromJson(task["schema"]) - username = dbutils.secrets.get(scope = task["secret_scope"], key = task["jdbc_username"]) - password = dbutils.secrets.get(scope = task["secret_scope"], key = task["jdbc_password"]) + username = dbutils.secrets.get(scope = task["input"]["secret_scope"], key = task["input"]["jdbc_username"]) + password = dbutils.secrets.get(scope = task["input"]["secret_scope"], key = task["input"]["jdbc_password"]) - spark.read \ - .format("jdbc") \ - .option("url", task["jdbc_url"]) \ - .option("dbtable", task["table_name"]) \ - .option("user", username) \ - .option("password", password) \ - .schema(schema) \ - .load() + df = spark.read \ + .format("jdbc") \ + .option("url", task["input"]["jdbc_url"]) \ + .option("dbtable", task["input"]["table_name"]) \ + .option("user", username) \ + .option("password", password) \ + .schema(schema) \ + .load() return df, False \ No newline at end of file From 1df46bdc739f9ae5bf40b54fa94eb10b243298f8 Mon Sep 17 00:00:00 2001 From: Bo Wang <36917625+Nick287@users.noreply.github.com> Date: Thu, 8 Dec 2022 07:02:23 +0000 Subject: [PATCH 2/6] discard change for init file --- src/cddp/__init__.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/cddp/__init__.py b/src/cddp/__init__.py index ddc6e7d..e713b6b 100644 --- a/src/cddp/__init__.py +++ b/src/cddp/__init__.py @@ -117,7 +117,6 @@ def start_staging_job(spark, config, task, timeout=None): print(f"Starting staging job for {task['name']}\n{json.dumps(task)}") staging_path = config["staging_path"] df, is_streaming = cddp_ingestion.start_ingestion_task(task, spark) - df.show() output_dataset(spark, task, df, is_streaming, staging_path, "append", timeout) @@ -130,12 +129,10 @@ def start_standard_job(spark, config, task, need_load_views=True, test_mode=Fals load_staging_views(spark, config) df = run_task_code(spark, task) - - is_streaming = False - if task["type"]=="streaming": - is_streaming = True - - output_dataset(spark, task, df, is_streaming, standard_path, "append", timeout) + if test_mode: + output_dataset(spark, task, df, False, standard_path, "append", timeout) + else: + output_dataset(spark, task, df, task["type"]=="streaming", standard_path, "append", timeout) return df @@ -249,13 +246,13 @@ def entrypoint(): parser.add_argument( '--stage', help='run a task in the specified stage', required=False) parser.add_argument('--task', help='run a specified task', required=False) - parser.add_argument('--show-result', action='store_true', + parser.add_argument('--show-result', type=bool, default=False, help='flag to show task data result', required=False) - parser.add_argument('--build-landing-zone', action='store_true', + parser.add_argument('--build-landing-zone', type=bool, default=False, help='build landing zone and import sample data, it will create folder "FileStore" in root folder', required=False) parser.add_argument('--await-termination', type=int, help='how many seconds to wait before streaming job terminating, no specified means not terminating.', required=False) - parser.add_argument('--cleanup-database', action='store_true', + parser.add_argument('--cleanup-database', type=bool, default=False, help='Clean up existing database', required=False) args = parser.parse_args() From ed637a7219e33a56009255486f575121a48ce30d Mon Sep 17 00:00:00 2001 From: Bo Wang <36917625+Nick287@users.noreply.github.com> Date: Mon, 12 Dec 2022 09:01:29 +0000 Subject: [PATCH 3/6] add sample data --- example/pipeline_fruit_batch_ADLS.json | 99 +++++++++++++++++++++++++- example/pipeline_fruit_batch_JDBC.json | 99 +++++++++++++++++++++++++- 2 files changed, 194 insertions(+), 4 deletions(-) diff --git a/example/pipeline_fruit_batch_ADLS.json b/example/pipeline_fruit_batch_ADLS.json index 7bb83f1..9825dd0 100644 --- a/example/pipeline_fruit_batch_ADLS.json +++ b/example/pipeline_fruit_batch_ADLS.json @@ -69,7 +69,44 @@ } ], "type": "struct" - } + }, + "sampleData": [ + { + "ID": 1, + "Amount": 12, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 2, + "Amount": 13, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 3, + "Amount": 14, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 4, + "Amount": 15, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 5, + "Amount": 16, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 6, + "Amount": 17, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 7, + "Amount": 18, + "TS": "2022-01-10T00:00:00.000+08:00" + } + ] }, { "name": "price_ingestion", @@ -175,7 +212,65 @@ } ], "type": "struct" - } + }, + "sampleData": [ + { + "ID": 1, + "Fruit": "Red Grape", + "Color": "Red", + "Price": 2, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 2, + "Fruit": "Peach", + "Color": "Yellow", + "Price": 3, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 3, + "Fruit": "Orange", + "Color": "Orange", + "Price": 2, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 4, + "Fruit": "Green Apple", + "Color": "Green", + "Price": 3, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 5, + "Fruit": "Fiji Apple", + "Color": "Red", + "Price": 3.5, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 6, + "Fruit": "Banana", + "Color": "Yellow", + "Price": 1, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 7, + "Fruit": "Green Grape", + "Color": " Green", + "Price": 2, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + } + ] } ], "standard": [ diff --git a/example/pipeline_fruit_batch_JDBC.json b/example/pipeline_fruit_batch_JDBC.json index 82db134..6afbbbc 100644 --- a/example/pipeline_fruit_batch_JDBC.json +++ b/example/pipeline_fruit_batch_JDBC.json @@ -67,7 +67,44 @@ } ], "type": "struct" - } + }, + "sampleData": [ + { + "ID": 1, + "Amount": 12, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 2, + "Amount": 13, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 3, + "Amount": 14, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 4, + "Amount": 15, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 5, + "Amount": 16, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 6, + "Amount": 17, + "TS": "2022-01-10T00:00:00.000+08:00" + }, + { + "ID": 7, + "Amount": 18, + "TS": "2022-01-10T00:00:00.000+08:00" + } + ] }, { "name": "price_ingestion", @@ -171,7 +208,65 @@ } ], "type": "struct" - } + }, + "sampleData": [ + { + "ID": 1, + "Fruit": "Red Grape", + "Color": "Red", + "Price": 2, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 2, + "Fruit": "Peach", + "Color": "Yellow", + "Price": 3, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 3, + "Fruit": "Orange", + "Color": "Orange", + "Price": 2, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 4, + "Fruit": "Green Apple", + "Color": "Green", + "Price": 3, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 5, + "Fruit": "Fiji Apple", + "Color": "Red", + "Price": 3.5, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 6, + "Fruit": "Banana", + "Color": "Yellow", + "Price": 1, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + }, + { + "ID": 7, + "Fruit": "Green Grape", + "Color": " Green", + "Price": 2, + "Start_TS": "2015-01-01T00:00:00.000+08:00", + "End_TS": "2099-12-31T23:59:59.000+08:00" + } + ] } ], "standard": [ From 403907a4e76d86bdb926ce035cbbd7224fd270e0 Mon Sep 17 00:00:00 2001 From: Bo Wang <36917625+Nick287@users.noreply.github.com> Date: Mon, 12 Dec 2022 12:31:31 +0000 Subject: [PATCH 4/6] update the HTML for new perpetry --- web/index.html | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/web/index.html b/web/index.html index bb9c32e..3853180 100644 --- a/web/index.html +++ b/web/index.html @@ -285,6 +285,16 @@

placeholder="JDBC URL"> +
+ + +
+
- +
+ + +
+
+ + +