diff --git a/couch2pg/src/importer.js b/couch2pg/src/importer.js index 97b1fb6e..bad3f11f 100644 --- a/couch2pg/src/importer.js +++ b/couch2pg/src/importer.js @@ -15,11 +15,12 @@ const UPDATE_SEQ_STMT = ` SET seq = $1, pending = $2, updated_at = CURRENT_TIMESTAMP WHERE source = $3 `; -const INSERT_DOCS_STMT = `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, doc) VALUES`; +const INSERT_DOCS_STMT = `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, source, doc) VALUES`; const ON_CONFLICT_STMT = ` ON CONFLICT (_id) DO UPDATE SET saved_timestamp = EXCLUDED.saved_timestamp, _deleted = EXCLUDED._deleted, + source = EXCLUDED.source, doc = EXCLUDED.doc `; @@ -63,7 +64,7 @@ const storeSeq = async (seq, pending, source) => { await client.end(); }; -const buildBulkInsertQuery = (allDocs) => { +const buildBulkInsertQuery = (allDocs, source) => { const now = new Date().toISOString(); let idx = 1; @@ -72,8 +73,8 @@ const buildBulkInsertQuery = (allDocs) => { allDocs.rows.forEach((row) => { removeSecurityDetails(row.doc); - insertStmts.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++})`); - docsToInsert.push(now, row.id, !!row.deleted, sanitise(JSON.stringify(row.doc))); + insertStmts.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++})`); + docsToInsert.push(now, row.id, !!row.deleted, source, sanitise(JSON.stringify(row.doc))); }); return { @@ -97,7 +98,7 @@ const addDeletesToResult = (deletedDocs, allDocs) => { Downloads all given documents from couchdb and stores them in Postgres, in batches. We presume if a document is on this list it has changed, and thus needs updating. */ -const loadAndStoreDocs = async (couchdb, docsToDownload) => { +const loadAndStoreDocs = async (couchdb, docsToDownload, source) => { if (!docsToDownload.length) { return; } @@ -109,13 +110,13 @@ const loadAndStoreDocs = async (couchdb, docsToDownload) => { const docsToStore = addDeletesToResult(deletedDocs, allDocsResult); - await storeDocs(docsToStore); + await storeDocs(docsToStore, source); }; -const storeDocs = async (allDocsResult) => { +const storeDocs = async (allDocsResult, source) => { let client; try { - const { query, values } = buildBulkInsertQuery(allDocsResult); + const { query, values } = buildBulkInsertQuery(allDocsResult, source); client = await db.getPgClient(); await client.query(query, values); @@ -124,7 +125,7 @@ const storeDocs = async (allDocsResult) => { if (err.code === '40P01') { // deadlock detected await client.end(); - return storeDocs(allDocsResult); + return storeDocs(allDocsResult, source); } throw err; } @@ -154,7 +155,7 @@ const importChangesBatch = async (couchDb, source) => { `in ${dbName}`); console.log(`There are approximately ${pending} changes left in ${dbName}`); - await loadAndStoreDocs(couchDb, changes.results); + await loadAndStoreDocs(couchDb, changes.results, source); await storeSeq(changes.last_seq, pending, source); return changes.results.length; diff --git a/couch2pg/src/setup.js b/couch2pg/src/setup.js index ee154d56..e76213d9 100644 --- a/couch2pg/src/setup.js +++ b/couch2pg/src/setup.js @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS ${db.postgresTable} ( saved_timestamp TIMESTAMP, _id VARCHAR PRIMARY KEY, _deleted BOOLEAN, + source varchar, doc jsonb )`; @@ -25,12 +26,17 @@ const createTimestampIndex = ` CREATE INDEX CONCURRENTLY IF NOT EXISTS saved_timestamp ON ${db.postgresTable}(saved_timestamp); `; +const createSourceIndex = ` +CREATE INDEX CONCURRENTLY IF NOT EXISTS source ON ${db.postgresTable}(source); +`; + export const createDatabase = async () => { const client = await db.getPgClient(); await client.query(createSchema); await client.query(createTable); await client.query(createDeleteIndex); await client.query(createTimestampIndex); + await client.query(createSourceIndex); await client.query(createProgressTable); await client.end(); }; diff --git a/couch2pg/tests/unit/importer.spec.js b/couch2pg/tests/unit/importer.spec.js index b73571d9..7496f3ee 100644 --- a/couch2pg/tests/unit/importer.spec.js +++ b/couch2pg/tests/unit/importer.spec.js @@ -24,12 +24,13 @@ const updateSeqMatch = () => ` WHERE source = $3 `; -const insertDocsMatch = () => `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, doc) VALUES`; +const insertDocsMatch = () => `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, source, doc) VALUES`; const ON_CONFLICT_STMT = ` ON CONFLICT (_id) DO UPDATE SET saved_timestamp = EXCLUDED.saved_timestamp, _deleted = EXCLUDED._deleted, + source = EXCLUDED.source, doc = EXCLUDED.doc `; @@ -156,22 +157,25 @@ describe('importer', () => { expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), 'doc1', false, + 'thehost/medic', JSON.stringify(docs[0].doc), now.toISOString(), 'doc2', false, + 'thehost/medic', JSON.stringify(docs[1].doc), now.toISOString(), 'doc3', false, + 'thehost/medic', JSON.stringify(docs[2].doc), ] ]]); @@ -234,64 +238,73 @@ describe('importer', () => { expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).callCount).to.equal(3); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[0]).to.deep.equal([ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), iterationOne.docs[0].id, false, + 'thehost/medic', JSON.stringify(iterationOne.docs[0].doc), now.toISOString(), iterationOne.docs[1].id, false, + 'thehost/medic', JSON.stringify(iterationOne.docs[1].doc), now.toISOString(), iterationOne.docs[2].id, false, + 'thehost/medic', JSON.stringify(iterationOne.docs[2].doc), ] ]); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[1]).to.deep.equal([ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), iterationTwo.docs[0].id, false, + 'thehost/medic', JSON.stringify(iterationTwo.docs[0].doc), now.toISOString(), iterationTwo.docs[1].id, false, + 'thehost/medic', JSON.stringify(iterationTwo.docs[1].doc), now.toISOString(), iterationTwo.docs[2].id, false, + 'thehost/medic', JSON.stringify(iterationTwo.docs[2].doc), ] ]); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[2]).to.deep.equal([ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), iterationThree.docs[0].id, false, + 'thehost/medic', JSON.stringify(iterationThree.docs[0].doc), now.toISOString(), iterationThree.docs[1].id, false, + 'thehost/medic', JSON.stringify(iterationThree.docs[1].doc), now.toISOString(), iterationThree.docs[2].id, false, + 'thehost/medic', JSON.stringify(iterationThree.docs[2].doc), ] ]); @@ -315,12 +328,13 @@ describe('importer', () => { await importer(couchDb); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5) ' + ON_CONFLICT_STMT, [ new Date().toISOString(), 'change', false, + 'thehost/medic', JSON.stringify({ _id: 'change', _rev: '1', @@ -352,12 +366,13 @@ describe('importer', () => { await importer(couchDb); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5) ' + ON_CONFLICT_STMT, [ new Date().toISOString(), 'change', false, + 'thehost/medic', JSON.stringify({ _id: 'org.couchdb.user:paul', _rev: '1', @@ -392,22 +407,25 @@ describe('importer', () => { expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc2'] }]]); expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), 'doc2', false, + 'thehost/medic', JSON.stringify(docs[0].doc), now.toISOString(), 'doc1', true, + 'thehost/medic', JSON.stringify({ _id: 'doc1', _rev: 1, _deleted: true }), now.toISOString(), 'doc3', true, + 'thehost/medic', JSON.stringify({ _id: 'doc3', _rev: undefined, _deleted: true }), ] ]]); @@ -565,22 +583,25 @@ describe('importer', () => { expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]); const queryArgs = [ - 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' + - '($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT, + 'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, source, doc) VALUES ' + + '($1, $2, $3, $4, $5),($6, $7, $8, $9, $10),($11, $12, $13, $14, $15) ' + ON_CONFLICT_STMT, [ now.toISOString(), 'doc1', false, + 'thehost/medic', JSON.stringify(docs[0].doc), now.toISOString(), 'doc2', false, + 'thehost/medic', JSON.stringify(docs[1].doc), now.toISOString(), 'doc3', false, + 'thehost/medic', JSON.stringify(docs[2].doc), ] ]; diff --git a/couch2pg/tests/unit/setup.spec.js b/couch2pg/tests/unit/setup.spec.js index 083c37fa..c775c3e1 100644 --- a/couch2pg/tests/unit/setup.spec.js +++ b/couch2pg/tests/unit/setup.spec.js @@ -32,7 +32,7 @@ describe('setup', () => { await setup.createDatabase(); expect(db.getPgClient.calledOnce).to.equal(true); - expect(pgClient.query.callCount).to.equal(5); + expect(pgClient.query.callCount).to.equal(6); expect(pgClient.end.calledOnce).to.equal(true); expect(pgClient.query.args[0]).to.deep.equal(['CREATE SCHEMA IF NOT EXISTS v1']); expect(pgClient.query.args[1]).to.deep.equal([ ` @@ -40,6 +40,7 @@ CREATE TABLE IF NOT EXISTS v1.whatever ( saved_timestamp TIMESTAMP, _id VARCHAR PRIMARY KEY, _deleted BOOLEAN, + source varchar, doc jsonb )`]); expect(pgClient.query.args[2]).to.deep.equal([` @@ -49,6 +50,9 @@ CREATE INDEX CONCURRENTLY IF NOT EXISTS _deleted ON v1.whatever(_deleted); CREATE INDEX CONCURRENTLY IF NOT EXISTS saved_timestamp ON v1.whatever(saved_timestamp); `]); expect(pgClient.query.args[4]).to.deep.equal([` +CREATE INDEX CONCURRENTLY IF NOT EXISTS source ON v1.whatever(source); +`]); + expect(pgClient.query.args[5]).to.deep.equal([` CREATE TABLE IF NOT EXISTS v1.couchdb_progress ( seq varchar, pending integer, diff --git a/dbt/dbt-run.py b/dbt/dbt-run.py index 7906e940..6e423fea 100644 --- a/dbt/dbt-run.py +++ b/dbt/dbt-run.py @@ -6,6 +6,7 @@ from urllib.parse import urlparse +dbt_selector = os.getenv("DBT_SELECTOR") def connection(): for attempt in range(5): @@ -37,7 +38,9 @@ def setup(): CREATE TABLE IF NOT EXISTS {os.getenv('POSTGRES_SCHEMA')}._dataemon ( inserted_on TIMESTAMP DEFAULT NOW(), - packages jsonb, manifest jsonb + packages jsonb, + manifest jsonb, + dbt_selector text ) """) conn.commit() @@ -61,6 +64,11 @@ def get_package(): "revision": init_package.fragment }]}) + if os.getenv("DBT_LOCAL_PATH"): + package_json = json.dumps({"packages": [{ + "local": '/dbt/package/' + }]}) + with open("/dbt/packages.yml", "w") as f: f.write(package_json) @@ -74,8 +82,9 @@ def get_manifest(): cur.execute(f""" SELECT manifest FROM {os.getenv('POSTGRES_SCHEMA')}._dataemon + WHERE dbt_selector = %s OR (dbt_selector IS NULL AND %s IS NULL) ORDER BY inserted_on DESC - """) + """, (dbt_selector,dbt_selector)) manifest = cur.fetchone() # save to file if found @@ -84,7 +93,11 @@ def get_manifest(): f.write(json.dumps(manifest[0])); # run dbt ls to make sure current manifest is generated - subprocess.run(["dbt", "ls", "--profiles-dir", ".dbt"]) + args = ["dbt", "ls", "--profiles-dir", ".dbt"] + if dbt_selector: + args.append('--select') + args.append(dbt_selector) + subprocess.run(args) new_manifest = '{}' with open("/dbt/target/manifest.json", "r") as f: @@ -97,14 +110,14 @@ def save_package_manifest(package_json, manifest_json): with conn.cursor() as cur: # because manifest is large, delete old entries # we only want the current/latest data - cur.execute( - f"DELETE FROM {os.getenv('POSTGRES_SCHEMA')}._dataemon " - ) - cur.execute( - f"INSERT INTO {os.getenv('POSTGRES_SCHEMA')}._dataemon " - "(packages, manifest) VALUES (%s, %s);", - [package_json, manifest_json] - ) + cur.execute(f""" + DELETE FROM {os.getenv('POSTGRES_SCHEMA')}._dataemon + WHERE dbt_selector = %s OR (dbt_selector IS NULL AND %s IS NULL) + """, (dbt_selector,dbt_selector)) + cur.execute(f""" + INSERT INTO {os.getenv('POSTGRES_SCHEMA')}._dataemon + (packages, manifest, dbt_selector) VALUES (%s, %s, %s); + """, (package_json, manifest_json, dbt_selector)) conn.commit() @@ -119,19 +132,39 @@ def update_models(): # save the new manifest and package for the next run save_package_manifest(package_json, manifest_json) - # anything that changed, run a full refresh - subprocess.run(["dbt", "run", + args = ["dbt", "run", "--profiles-dir", ".dbt", - "--select", - "state:modified", "--full-refresh", "--state", - "./old_manifest"]) + "./old_manifest", + "--select"] + + if dbt_selector: + args.append(f"{dbt_selector},state:modified") + else: + args.append("state:modified") + + # anything that changed, run a full refresh + subprocess.run(args) def run_incremental_models(): # update incremental models (and tables if there are any) - subprocess.run(["dbt", "run", "--profiles-dir", ".dbt", "--exclude", "config.materialized:view"]) + args = ["dbt", "run", + "--profiles-dir", + ".dbt", + "--exclude", "config.materialized:view"] + + if dbt_selector: + args.append('--select') + args.append(dbt_selector) + + batch_size = int(os.getenv("DBT_BATCH_SIZE") or 0) + if batch_size: + args.append("--vars") + args.append(f'{{batch_size: {batch_size}}}') + + subprocess.run(args) if __name__ == "__main__": diff --git a/deploy/cht_sync/templates/couch2pg.yaml b/deploy/cht_sync/templates/couch2pg.yaml index 2b2e1063..b3c77107 100644 --- a/deploy/cht_sync/templates/couch2pg.yaml +++ b/deploy/cht_sync/templates/couch2pg.yaml @@ -9,11 +9,11 @@ spec: replicas: 1 selector: matchLabels: - app: cht-sync + app: cht-sync-couch2pg-{{ $service.host | replace "." "-" }} template: metadata: labels: - app: cht-sync + app: cht-sync-couch2pg-{{ $service.host | replace "." "-" }} spec: automountServiceAccountToken: false containers: diff --git a/deploy/cht_sync/templates/dbt.yaml b/deploy/cht_sync/templates/dbt.yaml index 8d2151f0..4e235987 100644 --- a/deploy/cht_sync/templates/dbt.yaml +++ b/deploy/cht_sync/templates/dbt.yaml @@ -1,44 +1,51 @@ +{{- $dbt_selectors := .Values.dbt_selectors | default (list "") -}} +{{- range $index, $dbt_selector := $dbt_selectors }} +--- apiVersion: apps/v1 kind: Deployment metadata: - name: cht-sync-dbt + name: cht-sync-dbt-{{ $index }} spec: replicas: 1 selector: matchLabels: - app: cht-sync + app: cht-sync-dbt-{{ $index }} template: metadata: labels: - app: cht-sync + app: cht-sync-dbt-{{ $index }} spec: automountServiceAccountToken: false containers: - name: dbt - image: medicmobile/dataemon:{{ .Values.image_tag | default "latest" }} + image: medicmobile/dataemon:{{ $.Values.image_tag | default "latest" }} resources: limits: - cpu: {{ (.Values.dbt).cpu_limit | default "500m" }} - memory: {{ (.Values.dbt).memory_limit | default "1Gi" }} + cpu: {{ ($.Values.dbt).cpu_limit | default "500m" }} + memory: {{ ($.Values.dbt).memory_limit | default "1Gi" }} env: - name: POSTGRES_HOST - value: {{ .Values.postgres.host | default "postgres" }} + value: {{ $.Values.postgres.host | default "postgres" }} - name: POSTGRES_USER - value: {{ .Values.postgres.user }} + value: {{ $.Values.postgres.user }} - name: POSTGRES_PORT - value: {{ .Values.postgres.port | default "5432" | quote }} + value: {{ $.Values.postgres.port | default "5432" | quote }} - name: POSTGRES_PASSWORD - value: {{ .Values.postgres.password }} + value: {{ $.Values.postgres.password }} - name: POSTGRES_DB - value: {{ .Values.postgres.db }} + value: {{ $.Values.postgres.db }} - name: POSTGRES_TABLE - value: {{ .Values.postgres.table }} + value: {{ $.Values.postgres.table }} - name: POSTGRES_SCHEMA - value: {{ .Values.postgres.schema }} + value: {{ $.Values.postgres.schema }} - name: ROOT_POSTGRES_SCHEMA - value: {{ .Values.postgres.schema }} + value: {{ $.Values.postgres.schema }} - name: CHT_PIPELINE_BRANCH_URL - value: {{ .Values.cht_pipeline_branch_url }} + value: {{ $.Values.cht_pipeline_branch_url }} - name: DBT_THREAD_COUNT - value: {{ .Values.dbt_thread_count | default "1" | quote }} - + value: {{ $.Values.dbt_thread_count | default "1" | quote }} + - name: DBT_SELECTOR + value: {{ $dbt_selector }} + - name: DBT_BATCH_SIZE + value: {{ $.Values.dbt_batch_size | default "" | quote }} +{{- end }} diff --git a/deploy/cht_sync/templates/postgres-service.yaml b/deploy/cht_sync/templates/postgres-service.yaml index 9e3d862e..86d80004 100644 --- a/deploy/cht_sync/templates/postgres-service.yaml +++ b/deploy/cht_sync/templates/postgres-service.yaml @@ -5,9 +5,12 @@ metadata: name: postgres spec: selector: - inner.service: postgres + app: cht-sync-postgres ports: - protocol: TCP port: {{ .Values.postgres.port | default "5432" }} targetPort: 5432 + {{- if .Values.postgres.nodePort }} + nodePort: {{ .Values.postgres.nodePort }} + {{- end }} {{- end }} diff --git a/deploy/cht_sync/templates/postgres.yaml b/deploy/cht_sync/templates/postgres.yaml index 09c7eecd..0bae5777 100644 --- a/deploy/cht_sync/templates/postgres.yaml +++ b/deploy/cht_sync/templates/postgres.yaml @@ -10,13 +10,11 @@ spec: replicas: 1 selector: matchLabels: - app: cht-sync - inner.service: postgres + app: cht-sync-postgres template: metadata: labels: - app: cht-sync - inner.service: postgres + app: cht-sync-postgres spec: automountServiceAccountToken: false containers: @@ -40,8 +38,14 @@ spec: volumeMounts: - name: postgres-data mountPath: /var/lib/postgresql/data + - name: postgres-shm + mountPath: /dev/shm volumes: - name: postgres-data persistentVolumeClaim: claimName: postgres-pvc + - name: postgres-shm + emptyDir: + medium: Memory + sizeLimit: {{ (.Values.postgres).memory_limit | default "2Gi" }} {{- end }} diff --git a/deploy/cht_sync/templates/postgrest-service.yaml b/deploy/cht_sync/templates/postgrest-service.yaml deleted file mode 100644 index 94ad911a..00000000 --- a/deploy/cht_sync/templates/postgrest-service.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: postgrest -spec: - selector: - inner.service: postgrest - ports: - - protocol: TCP - port: 3000 diff --git a/deploy/cht_sync/values.yaml.template b/deploy/cht_sync/values.yaml.template index 9afcb2e9..a8b8449e 100644 --- a/deploy/cht_sync/values.yaml.template +++ b/deploy/cht_sync/values.yaml.template @@ -11,6 +11,7 @@ postgres: cht_pipeline_branch_url: "https://github.com/medic/cht-pipeline.git#main" dbt_thread_count: 1 +dbt_batch_size: 100000 # values shared by all couchdb instances # can be omitted if couchdb instances do not share any values diff --git a/deploy/scripts/upgrade/add-manifest-and-source.sh b/deploy/scripts/upgrade/add-manifest-and-source.sh new file mode 100755 index 00000000..424087d6 --- /dev/null +++ b/deploy/scripts/upgrade/add-manifest-and-source.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# Source the migration template +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +source "$SCRIPT_DIR/lib/migration.sh" + +# SQL statements to execute +SQL_STATEMENTS=( + "ALTER TABLE _dataemon ADD COLUMN IF NOT EXISTS manifest jsonb;" + "ALTER TABLE _dataemon ADD COLUMN IF NOT EXISTS dbt_selector text;" + "ALTER TABLE couchdb ADD COLUMN IF NOT EXISTS source varchar;" + "CREATE INDEX IF NOT EXISTS source ON couchdb(source);" +) + +# Execute each SQL statement +for sql in "${SQL_STATEMENTS[@]}"; do + execute_sql "$sql" +done diff --git a/deploy/scripts/upgrade/lib/migration.sh b/deploy/scripts/upgrade/lib/migration.sh new file mode 100755 index 00000000..9a81eb5f --- /dev/null +++ b/deploy/scripts/upgrade/lib/migration.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# Check for required arguments +if [ "$#" -lt 3 ] || { [ "$1" == "kubectl" ] && [ "$#" -ne 4 ]; }; then + echo "Usage: $0 [namespace]" + exit 1 +fi + +# Assign arguments to variables +ENVIRONMENT=$1 +USERNAME=$2 +DBNAME=$3 + +# Function to execute SQL statement +execute_sql() { + local sql_statement=$1 + + if [ "$ENVIRONMENT" == "docker" ]; then + # Find the container ID or name automatically + CONTAINER_ID=$(docker ps --filter "name=postgres" --format "{{.ID}}" | head -n 1) + + if [ -z "$CONTAINER_ID" ]; then + echo "No running Postgres container found." + exit 1 + fi + + # Run the SQL statement using docker exec + docker exec "$CONTAINER_ID" psql -U "$USERNAME" "$DBNAME" -c "$sql_statement" + + elif [ "$ENVIRONMENT" == "kubectl" ]; then + # Assign namespace argument + NAMESPACE=$4 + + # Find the Postgres pod automatically + POD_NAME=$(kubectl get pods -n "$NAMESPACE" -l app.name=postgres -o jsonpath="{.items[0].metadata.name}") + + if [ -z "$POD_NAME" ]; then + # Try alternative label + POD_NAME=$(kubectl get pods -n "$NAMESPACE" -l inner.service=postgres -o jsonpath="{.items[0].metadata.name}") + if [ -z "$POD_NAME" ]; then + echo "No running Postgres pod found in namespace $NAMESPACE." + exit 1 + fi + fi + + # Run the SQL statement using kubectl exec + kubectl -n "$NAMESPACE" exec "$POD_NAME" -- psql -U "$USERNAME" "$DBNAME" -c "$sql_statement" + + else + echo "Invalid environment specified. Use 'kubectl' or 'docker'." + exit 1 + fi +} diff --git a/deploy/scripts/upgrade/upgrade-84-monitoring.sh b/deploy/scripts/upgrade/upgrade-84-monitoring.sh index fc6a4e42..9ca048a5 100755 --- a/deploy/scripts/upgrade/upgrade-84-monitoring.sh +++ b/deploy/scripts/upgrade/upgrade-84-monitoring.sh @@ -1,51 +1,8 @@ #!/bin/bash -# Check for required arguments -if [ "$#" -lt 3 ] || { [ "$1" == "kubectl" ] && [ "$#" -ne 4 ]; }; then - echo "Usage: $0 [namespace]" - exit 1 -fi +# Source the migration template +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +source "SCRIPT_DIR/lib/migration.sh" -# Assign arguments to variables -ENVIRONMENT=$1 -USERNAME=$2 -DBNAME=$3 ALTER_STATEMENT='ALTER TABLE v1.couchdb_progress ADD pending integer, ADD updated_at timestamptz'; - -if [ "$ENVIRONMENT" == "docker" ]; then - # Find the container ID or name automatically - CONTAINER_ID=$(docker ps --filter "name=postgres" --format "{{.ID}}" | head -n 1) - - if [ -z "$CONTAINER_ID" ]; then - echo "No running Postgres container found." - exit 1 - fi - - # Run the ALTER TABLE statement using docker exec - docker exec "$CONTAINER_ID" psql -U "$USERNAME" "$DBNAME" -c "$ALTER_STATEMENT" - -elif [ "$ENVIRONMENT" == "kubectl" ]; then - # Assign namespace argument - NAMESPACE=$4 - - # Find the Postgres pod automatically - POD_NAME=$(kubectl get pods -n "$NAMESPACE" -l app.name=postgres -o jsonpath="{.items[0].metadata.name}") - - if [ -z "$POD_NAME" ]; then - # Find the Postgres pod automatically - POD_NAME=$(kubectl get pods -n "$NAMESPACE" -l inner.service=postgres -o jsonpath="{.items[0].metadata.name}") - if [ -z "$POD_NAME" ]; then - echo "No running Postgres pod found in namespace $NAMESPACE." - exit 1 - fi - fi - - # echo $POD_NAME - # Run the ALTER TABLE statement using kubectl exec - kubectl -n "$NAMESPACE" exec "$POD_NAME" -- psql -U "$USERNAME" "$DBNAME" -c "$ALTER_STATEMENT" - -else - echo "Invalid environment specified. Use 'kubectl' or 'docker'." - exit 1 -fi - +execute_sql "$ALTER_STATEMENT" diff --git a/docker-compose.bastion.yml b/docker-compose.bastion.yml deleted file mode 100644 index 6147b179..00000000 --- a/docker-compose.bastion.yml +++ /dev/null @@ -1,12 +0,0 @@ -services: - bastion: - build: ./bastion/ - restart: unless-stopped - ports: - - ${BASTION_PORT:-22222}:22/tcp - volumes: - - ${BASTION_AUTHORIZED_KEYS_FILE:-$PWD/bastion/authorized_keys}:/var/lib/bastion/authorized_keys-tmp - - bastion:/usr/etc/ssh:rw - -volumes: - bastion: diff --git a/docker-compose.couchdb.yml b/docker-compose.couchdb.yml deleted file mode 100644 index 65b52cd1..00000000 --- a/docker-compose.couchdb.yml +++ /dev/null @@ -1,12 +0,0 @@ -services: - couchdb: - image: public.ecr.aws/medic/cht-couchdb:4.15.0 - restart: always - ports: - - "5984:5984" - environment: - - COUCHDB_USER=${COUCHDB_USER} - - COUCHDB_PASSWORD=${COUCHDB_PASSWORD} - - COUCHDB_SECRET=9c0d6034-0f19-45df-8fcd-5fec9c473c73 - - COUCHDB_UUID=4c6bffc8-a5ac-4f98-a34c-6fb0e63964e5 - - SVC_NAME=couchdb diff --git a/docker-compose.pgadmin.yml b/docker-compose.pgadmin.yml deleted file mode 100644 index e1ec7e1f..00000000 --- a/docker-compose.pgadmin.yml +++ /dev/null @@ -1,13 +0,0 @@ -services: - postgres: - ports: - - 5432:${POSTGRES_PORT:-5432} - - pgadmin: - image: dpage/pgadmin4 - environment: - PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org} - PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin} - PGADMIN_CONFIG_SERVER_MODE: 'False' - ports: - - "${PGADMIN_PORT:-5050}:80" diff --git a/docker-compose.postgres.yml b/docker-compose.postgres.yml deleted file mode 100644 index e4b2327f..00000000 --- a/docker-compose.postgres.yml +++ /dev/null @@ -1,12 +0,0 @@ -services: - postgres: - image: postgres:16 - restart: always - volumes: - - ./postgres/init-dbt-resources.sh:/docker-entrypoint-initdb.d/init-dbt-resources.sh:z - environment: - - POSTGRES_USER=${POSTGRES_USER} - - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_DB=${POSTGRES_DB} - - POSTGRES_TABLES=${COUCHDB_DBS} - - POSTGRES_SCHEMA=${POSTGRES_SCHEMA} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index c39352c5..ac2620c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,21 @@ name: ${COMPOSE_PROJECT_NAME:-cht-sync} + +x-dbt-base: &dbt-common + build: ./dbt/ + working_dir: /dbt/ + environment: &dbt-env + POSTGRES_HOST: ${POSTGRES_HOST} + POSTGRES_PORT: ${POSTGRES_PORT:-5432} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + POSTGRES_TABLE: ${POSTGRES_TABLE} + POSTGRES_SCHEMA: ${POSTGRES_SCHEMA} + ROOT_POSTGRES_SCHEMA: ${POSTGRES_SCHEMA} + DATAEMON_INTERVAL: ${DATAEMON_INTERVAL} + DBT_THREAD_COUNT: ${DBT_THREAD_COUNT:-1} + DBT_BATCH_SIZE: ${DBT_BATCH_SIZE:-0} + services: couch2pg: build: ./couch2pg/ @@ -8,35 +25,117 @@ services: driver: "json-file" options: max-size: "512m" + environment: + <<: *dbt-env + COUCHDB_USER: ${COUCHDB_USER} + COUCHDB_PASSWORD: ${COUCHDB_PASSWORD} + COUCHDB_HOST: ${COUCHDB_HOST} + COUCHDB_DBS: ${COUCHDB_DBS} + COUCHDB_PORT: ${COUCHDB_PORT} + COUCHDB_SECURE: ${COUCHDB_SECURE:-true} + restart: always + +#### Test ############################################################################################## + + dbt-test-contacts: + <<: *dbt-common + volumes: + - "${DBT_LOCAL_PATH}:/dbt/package/" + profiles: + - test + environment: + <<: *dbt-env + DBT_SELECTOR: tag:contacts + DBT_LOCAL_PATH: ${DBT_LOCAL_PATH} + + dbt-test-reports: + <<: *dbt-common + volumes: + - "${DBT_LOCAL_PATH}:/dbt/package/" + profiles: + - test + depends_on: + - dbt-test-contacts + environment: + <<: *dbt-env + DBT_SELECTOR: tag:reports + DBT_LOCAL_PATH: ${DBT_LOCAL_PATH} + + couchdb: + image: public.ecr.aws/medic/cht-couchdb:4.15.0 + restart: always + profiles: + - test + ports: + - "5984:5984" environment: - COUCHDB_USER=${COUCHDB_USER} - COUCHDB_PASSWORD=${COUCHDB_PASSWORD} - - COUCHDB_HOST=${COUCHDB_HOST} - - COUCHDB_DBS=${COUCHDB_DBS} - - COUCHDB_PORT=${COUCHDB_PORT} - - COUCHDB_SECURE=${COUCHDB_SECURE:-true} - - POSTGRES_USER=${POSTGRES_USER} - - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_HOST=${POSTGRES_HOST} - - POSTGRES_DB=${POSTGRES_DB} - - POSTGRES_PORT=${POSTGRES_PORT:-5432} - - POSTGRES_SCHEMA=${POSTGRES_SCHEMA} - - POSTGRES_TABLE=${POSTGRES_TABLE} - restart: always + - COUCHDB_SECRET=9c0d6034-0f19-45df-8fcd-5fec9c473c73 + - COUCHDB_UUID=4c6bffc8-a5ac-4f98-a34c-6fb0e63964e5 - dbt: - build: ./dbt/ - working_dir: /dbt/ +#### Local ############################################################################################## + + postgres: + image: postgres:16 + restart: always + profiles: + - test + - local + volumes: + - ./postgres/init-dbt-resources.sh:/docker-entrypoint-initdb.d/init-dbt-resources.sh:z environment: - - POSTGRES_HOST=${POSTGRES_HOST} - - POSTGRES_PORT=${POSTGRES_PORT:-5432} - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - POSTGRES_DB=${POSTGRES_DB} - - POSTGRES_TABLE=${POSTGRES_TABLE} + - POSTGRES_TABLES=${COUCHDB_DBS} - POSTGRES_SCHEMA=${POSTGRES_SCHEMA} - - ROOT_POSTGRES_SCHEMA=${POSTGRES_SCHEMA} - - CHT_PIPELINE_BRANCH_URL=${CHT_PIPELINE_BRANCH_URL} - - DATAEMON_INTERVAL=${DATAEMON_INTERVAL} - - DBT_PACKAGE_TARBALL_URL=${DBT_PACKAGE_TARBALL_URL} - - DBT_THREAD_COUNT=${DBT_THREAD_COUNT} + + dbt-local: + <<: *dbt-common + volumes: + - "${DBT_LOCAL_PATH}:/dbt/package/" + profiles: + - local + environment: + <<: *dbt-env + DBT_SELECTOR: '' + DBT_LOCAL_PATH: ${DBT_LOCAL_PATH} + + pgadmin: + image: dpage/pgadmin4 + profiles: + - local + environment: + PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org} + PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin} + PGADMIN_CONFIG_SERVER_MODE: 'False' + ports: + - "${PGADMIN_PORT:-5050}:80" + + +#### Production ########################################################################################## + + dbt-production: + <<: *dbt-common + profiles: + - production + environment: + <<: *dbt-env + DBT_SELECTOR: '' + CHT_PIPELINE_BRANCH_URL: ${CHT_PIPELINE_BRANCH_URL} + + bastion: + build: ./bastion/ + restart: unless-stopped + profiles: + - production + - test + ports: + - ${BASTION_PORT:-22222}:22/tcp + volumes: + - ${BASTION_AUTHORIZED_KEYS_FILE:-$PWD/bastion/authorized_keys}:/var/lib/bastion/authorized_keys-tmp + - bastion:/usr/etc/ssh:rw + +volumes: + bastion: diff --git a/env.template b/env.template index e6aca3f1..5898e8df 100644 --- a/env.template +++ b/env.template @@ -11,6 +11,10 @@ POSTGRES_PORT=5432 CHT_PIPELINE_BRANCH_URL="https://github.com/medic/cht-pipeline.git#main" DATAEMON_INTERVAL=5 DBT_THREAD_COUNT=1 +DBT_BATCH_SIZE=100000 + +# if running locally, path to pipeline +DBT_LOCAL_PATH="/path/to/cht-pipeline/" # couchdb COUCHDB_USER=medic @@ -24,4 +28,4 @@ COUCHDB_SECURE=false #COMPOSE_PROJECT_NAME=cht-sync #BASTION_PORT=22222 # default is 22222 uncomment to change -#BASTION_AUTHORIZED_KEYS_FILE= # uncomment to change \ No newline at end of file +#BASTION_AUTHORIZED_KEYS_FILE= # uncomment to change diff --git a/package.json b/package.json index 43385efa..1f01bba1 100644 --- a/package.json +++ b/package.json @@ -8,8 +8,8 @@ "test:e2e": "npm run test:e2e-data && npm run test:e2e-stop-containers && npm run test:e2e-containers && npm run test:e2e-mocha && npm run test:e2e-stop-containers ", "test:e2e-mocha": "mocha tests/**/*.spec.js --timeout 70000", "lint": "eslint --color --cache .", - "test:e2e-stop-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml -f docker-compose.bastion.yml kill && docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml -f docker-compose.bastion.yml down -v", - "test:e2e-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml -f docker-compose.bastion.yml up -d --build --force-recreate && npm run wait-for-couchdb", + "test:e2e-stop-containers": "docker compose --profile test --env-file ./tests/.e2e-env -f docker-compose.yml kill && docker compose --profile test --env-file ./tests/.e2e-env -f docker-compose.yml down -v", + "test:e2e-containers": "docker compose --profile test --env-file ./tests/.e2e-env -f docker-compose.yml up -d --build --force-recreate && npm run wait-for-couchdb", "test:e2e-data": "cd tests/data && rm -rf ./json_docs && cht csv-to-docs", "test": "cd couch2pg && npm run test", "wait-for-couchdb": "bash -c 'until nc -z localhost 5984; do sleep 1; done; echo \"CouchDB is ready\"'" diff --git a/tests/.e2e-env b/tests/.e2e-env index 6b12b27c..a9af4bf1 100644 --- a/tests/.e2e-env +++ b/tests/.e2e-env @@ -4,7 +4,7 @@ POSTGRES_DB="cht_sync" POSTGRES_TABLE="couchdb" POSTGRES_SCHEMA="v1" POSTGRES_PORT=5432 -DBT_PACKAGE_TARBALL_URL="http://dbt-package/dbt/package.tar.gz" +DBT_LOCAL_PATH="./tests/dbt/package" DATAEMON_INTERVAL=0 COUCHDB_USER="medic" COUCHDB_PASSWORD="password" diff --git a/tests/dbt/Dockerfile b/tests/dbt/Dockerfile deleted file mode 100644 index 5d908122..00000000 --- a/tests/dbt/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM nginx:1.25.1-alpine as base_nginx - -RUN apk add --update --no-cache \ - curl \ - socat \ - sed \ - bash \ - tar - -WORKDIR /dbt - -COPY nginx.conf /etc/nginx/nginx.conf -COPY package ./package -RUN tar -czvf ./package.tar.gz ./package - diff --git a/tests/dbt/docker-compose.yml b/tests/dbt/docker-compose.yml deleted file mode 100644 index 32f03782..00000000 --- a/tests/dbt/docker-compose.yml +++ /dev/null @@ -1,5 +0,0 @@ -services: - dbt-package: - build: ./tests/dbt/ - ports: - - "10080:80" diff --git a/tests/dbt/nginx.conf b/tests/dbt/nginx.conf deleted file mode 100644 index 2e743aaf..00000000 --- a/tests/dbt/nginx.conf +++ /dev/null @@ -1,21 +0,0 @@ -user nginx; -worker_processes auto; - -events { - worker_connections 10240; -} - -http { - sendfile on; - keepalive_timeout 3600; - - server { - listen 80; - listen [::]:80; - - root /; - location / { - try_files $uri $uri/ =404; - } - } -} diff --git a/tests/dbt/package/models/contacts/contacts.yml b/tests/dbt/package/models/contacts/contacts.yml index 0e8881e1..9f503380 100644 --- a/tests/dbt/package/models/contacts/contacts.yml +++ b/tests/dbt/package/models/contacts/contacts.yml @@ -3,6 +3,7 @@ version: 2 models: - name: contacts config: + tags: ["contacts"] contract: enforced: true columns: @@ -39,6 +40,7 @@ models: - name: persons config: + tags: ["contacts"] contract: enforced: true columns: diff --git a/tests/dbt/package/models/reports/reports.yml b/tests/dbt/package/models/reports/reports.yml index 53d06861..66d41916 100644 --- a/tests/dbt/package/models/reports/reports.yml +++ b/tests/dbt/package/models/reports/reports.yml @@ -2,6 +2,8 @@ version: 1 models: - name: reports + config: + tags: ["reports"] columns: - name: _id tests: diff --git a/tests/e2e-test.spec.js b/tests/e2e-test.spec.js index e77b31e6..d07c3a76 100644 --- a/tests/e2e-test.spec.js +++ b/tests/e2e-test.spec.js @@ -157,7 +157,7 @@ describe('Main workflow Test Suite', () => { await editDoc({ ...report, edited: 1 }); await editDoc({ ...contact, edited: 1 }); - await delay(6); // wait for CHT-Sync + await delay(24); // wait for CHT-Sync const pgTableDataRecord = await client.query(`SELECT * from ${PGTABLE} where _id = $1`, [report._id]); expect(pgTableDataRecord.rows[0].doc.edited).to.equal(1); @@ -328,6 +328,33 @@ describe('Main workflow Test Suite', () => { }); }); + describe('DBT Selector Tests', () => { + it('should maintain separate manifests for different selectors', async () => { + // Test contacts selector + await delay(6); + + const contactsManifest = await client.query( + `SELECT manifest, dbt_selector FROM ${POSTGRES_SCHEMA}._dataemon WHERE dbt_selector = $1`, + ['tag:contacts'] + ); + expect(contactsManifest.rows.length).to.be.greaterThan(0); + expect(contactsManifest.rows[0].dbt_selector).to.equal('tag:contacts'); + + // Test reports selector + await delay(6); + + const reportsManifest = await client.query( + `SELECT manifest, dbt_selector FROM ${POSTGRES_SCHEMA}._dataemon WHERE dbt_selector = $1`, + ['tag:reports'] + ); + expect(reportsManifest.rows.length).to.be.greaterThan(0); + expect(reportsManifest.rows[0].dbt_selector).to.equal('tag:reports'); + + // Verify manifests are different + expect(contactsManifest.rows[0].manifest).to.not.deep.equal(reportsManifest.rows[0].manifest); + }); + }); + describe('Downtime handles', () => { after(async () => { const isAlive = await isPostgresConnectionAlive(client); diff --git a/tests/utils/docker-utils.js b/tests/utils/docker-utils.js index 125a177e..19716e3b 100644 --- a/tests/utils/docker-utils.js +++ b/tests/utils/docker-utils.js @@ -2,8 +2,6 @@ import { execSync } from 'child_process'; const composeFiles = [ 'docker-compose.yml', - 'docker-compose.couchdb.yml', - 'docker-compose.postgres.yml' ].map(file => `-f ${file}`).join(' '); const execDockerCommand = (command) => {