From d3ae470515b92301caba90ae2de1f43316669014 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 7 Feb 2025 20:36:25 +0000 Subject: [PATCH 1/4] feat(views): ds dump sql; - pg_dump sql format for datastore dump. --- ckanext/datastore/blueprint.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 2f4ff37961e..a11412f7ea9 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -28,6 +28,9 @@ json_writer, xml_writer, ) +# (canada fork only): psql dump format +import subprocess +from ckanext.datastore.backend import DatastoreBackend int_validator = get_validator(u'int_validator') boolean_validator = get_validator(u'boolean_validator') @@ -36,7 +39,8 @@ default = cast(ValidatorFactory, get_validator(u'default')) unicode_only = get_validator(u'unicode_only') -DUMP_FORMATS = u'csv', u'tsv', u'json', u'xml' +# (canada fork only): psql dump format +DUMP_FORMATS = u'csv', u'tsv', u'json', u'xml', 'sql' PAGINATE_BY = 32000 datastore = Blueprint(u'datastore', __name__) @@ -141,6 +145,11 @@ def dump(resource_id: str): content_disposition = 'attachment; filename="{name}.xml"'.format( name=resource_id) content_type = b'text/xml; charset=utf-8' + # (canada fork only): psql dump format + elif fmt == 'sql': + content_disposition = 'attachment; filename="{name}.sql"'.format( + name=resource_id) + content_type = b'application/sql; charset=utf-8' else: abort(404, _('Unsupported format')) @@ -271,6 +280,16 @@ def dump_to( elif fmt == 'xml': writer_factory = xml_writer records_format = 'objects' + # (canada fork only): psql dump format + elif fmt == 'sql': + datastore_uri = str(DatastoreBackend.get_active_backend()._get_write_engine().url) + cmd = subprocess.Popen( + ['pg_dump', datastore_uri, '-a', '-t', resource_id], + stdout=subprocess.PIPE) + def stream_sql(process): + for c in iter(lambda: process.stdout.read(1), b""): + yield c + return stream_sql(cmd) else: assert False, 'Unsupported format' From 7a207977f18532f194ae3233d1fc50247ad24e4e Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Tue, 11 Feb 2025 19:06:12 +0000 Subject: [PATCH 2/4] feat(dev): pg dump; - Datastore pg_dump endpoint. --- ckanext/datastore/blueprint.py | 11 +++++++++-- .../templates/ajax_snippets/api_info.html | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 3cc42ce84ba..cb95bd52ddb 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -120,6 +120,10 @@ def dump(resource_id: str): 'limit': 0}) except ObjectNotFound: abort(404, _('DataStore resource not found')) + # (canada fork only): handle 403 + # TODO: upstream contrib!! + except NotAuthorized: + return abort(403) data, errors = dict_fns.validate(request.args.to_dict(), dump_schema()) if errors: @@ -304,11 +308,14 @@ def dump_to( # (canada fork only): psql dump format elif fmt == 'sql': datastore_uri = str(DatastoreBackend.get_active_backend()._get_write_engine().url) + # dump clean table, schema, and data. + # quote all for blank values. cmd = subprocess.Popen( - ['pg_dump', datastore_uri, '-a', '-t', resource_id], + ['pg_dump', datastore_uri, '--clean', '--if-exists', '--disable-triggers', + '--no-owner', '--quote-all-identifiers', '-t', resource_id], stdout=subprocess.PIPE) def stream_sql(process): - for c in iter(lambda: process.stdout.read(1), b""): + for c in iter(lambda: process.stdout.read(-1), b""): yield c return stream_sql(cmd) else: diff --git a/ckanext/datastore/templates/ajax_snippets/api_info.html b/ckanext/datastore/templates/ajax_snippets/api_info.html index 77cfc293b4f..bc9bc4b06d3 100644 --- a/ckanext/datastore/templates/ajax_snippets/api_info.html +++ b/ckanext/datastore/templates/ajax_snippets/api_info.html @@ -128,6 +128,22 @@

+ {# (canada fork only): psql dump format #} +
+
+ +
+
+
+

{{ _('You can download the entire DataStore table in a pg_dump format to load into your own Postgres database for more advanced querying. This requires you to have write access to a running Postgres server and have the psql command line utility installed.') }}

+

{{ _('Download SQL File') }}

+

{{ _('In your Command Prompt, PowerShell, or Terminal enter the command:') }}

+
+psql -h [{{ _('your database address') }}] -p [{{ _('your database port') }}] -U [{{ _('your database username') }}] -d [{{ _('your database name') }}] -v ON_ERROR_STOP=0 -f {{ resource_id }}.sql
+                  
+
+
+
From 29a58a7db9f02c5869ae80a49b01c2ff8a5277a1 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 12 Feb 2025 15:59:02 +0000 Subject: [PATCH 3/4] feat(misc): changelog; - Add change log file. --- changes/191.canada.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/191.canada.feature diff --git a/changes/191.canada.feature b/changes/191.canada.feature new file mode 100644 index 00000000000..313c14939ad --- /dev/null +++ b/changes/191.canada.feature @@ -0,0 +1 @@ +Adds `sql` format to DataStore dump, which is a custom `pg_dump` export of the DataStore table. From 9d773ee045388cbde97668b800239536c85e083a Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 24 Feb 2025 15:58:44 +0000 Subject: [PATCH 4/4] feat(dev): tighten; - Use subprocess.run for timeouts. - Added max execution for sql dump config. - Made sql dump pluggable. - Used max buffer sizes for subprocess and byte chunks. --- ckanext/datastore/backend/__init__.py | 11 +++++++ ckanext/datastore/backend/postgres.py | 17 +++++++++++ ckanext/datastore/blueprint.py | 35 ++++++++++++++--------- ckanext/datastore/config_declaration.yaml | 10 +++++++ 4 files changed, 60 insertions(+), 13 deletions(-) diff --git a/ckanext/datastore/backend/__init__.py b/ckanext/datastore/backend/__init__.py index 447e2e37bfa..8977572a2c9 100644 --- a/ckanext/datastore/backend/__init__.py +++ b/ckanext/datastore/backend/__init__.py @@ -6,6 +6,10 @@ import logging from typing import Any, Container +# (canada fork only): psql dump format +# TODO: upstream contrib?? +import subprocess + import ckan.plugins as plugins from ckan.common import CKANConfig, config from ckanext.datastore.interfaces import IDatastoreBackend @@ -194,6 +198,13 @@ def search_sql(self, context: Context, data_dict: dict[str, Any]) -> Any: """ raise NotImplementedError() + # (canada fork only): psql dump format + # TODO: upstream contrib?? + def dump_sql(self, id: str) -> subprocess.CompletedProcess: + """Define an SQL dump export for the Resource's table. + """ + raise NotImplementedError() + def resource_exists(self, id: str) -> bool: """Define whether resource exists in datastore. """ diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index 2b7cc177923..0298da9a35c 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -6,6 +6,10 @@ # TODO: upstream contrib!! import re import string +# (canada fork only): psql dump format +# TODO: upstream contrib?? +import subprocess +from io import DEFAULT_BUFFER_SIZE from typing_extensions import TypeAlias @@ -2530,6 +2534,19 @@ def search_sql(self, context: Context, data_dict: dict[str, Any]): }) return search_sql(context, data_dict) + # (canada fork only): psql dump format + # TODO: upstream contrib?? + def dump_sql(self, id: str) -> subprocess.CompletedProcess: + datastore_uri = str(self._get_write_engine().url) + # dump clean table, schema, and data. + # quote all for blank values. + return subprocess.run( + ['pg_dump', datastore_uri, '--clean', '--if-exists', '--disable-triggers', + '--no-owner', '--quote-all-identifiers', '-E', 'utf-8', '-t', id], + bufsize=DEFAULT_BUFFER_SIZE, + capture_output=True, + timeout=config.get('ckan.datastore.max_sql_dump_exec_time', 60)) + def resource_exists(self, id: str) -> bool: resources_sql = sa.text( '''SELECT 1 FROM "_table_metadata" diff --git a/ckanext/datastore/blueprint.py b/ckanext/datastore/blueprint.py index 4f86fbfe5d3..4042eea7aa0 100644 --- a/ckanext/datastore/blueprint.py +++ b/ckanext/datastore/blueprint.py @@ -4,6 +4,12 @@ from typing import Any, Optional, cast, Union from itertools import zip_longest +# (canada fork only): psql dump format +# TODO: upstream contrib?? +from io import DEFAULT_BUFFER_SIZE, BytesIO +import subprocess +from functools import partial + from flask import Blueprint, Response from flask.views import MethodView @@ -30,8 +36,7 @@ json_writer, xml_writer, ) -# (canada fork only): psql dump format -import subprocess + from ckanext.datastore.backend import DatastoreBackend # (canada fork only): filename to save stream to import re @@ -48,6 +53,7 @@ unicode_only = get_validator(u'unicode_only') # (canada fork only): psql dump format +# TODO: upstream contrib?? DUMP_FORMATS = u'csv', u'tsv', u'json', u'xml', 'sql' PAGINATE_BY = 32000 @@ -119,6 +125,9 @@ def dump_schema() -> Schema: def dump(resource_id: str): try: get_action('datastore_search')({}, {'resource_id': resource_id, + # (canada fork only): include_total False + # TODO: upstream contrib!! + 'include_total': False, 'limit': 0}) except ObjectNotFound: abort(404, _('DataStore resource not found')) @@ -173,6 +182,7 @@ def dump(resource_id: str): name=filename) # (canada fork only): filename to save stream to content_type = b'text/xml; charset=utf-8' # (canada fork only): psql dump format + # TODO: upstream contrib?? elif fmt == 'sql': content_disposition = 'attachment; filename="{name}.sql"'.format( name=resource_id) @@ -199,6 +209,10 @@ def dump(resource_id: str): headers=headers) except ObjectNotFound: abort(404, _('DataStore resource not found')) + # (canada fork only): psql dump format + # TODO: upstream contrib?? + except (subprocess.TimeoutExpired, subprocess.CalledProcessError): + abort(500, _('Failed to download records into SQL file')) class DictionaryView(MethodView): @@ -314,18 +328,13 @@ def dump_to( writer_factory = xml_writer records_format = 'objects' # (canada fork only): psql dump format + # TODO: upstream contrib?? elif fmt == 'sql': - datastore_uri = str(DatastoreBackend.get_active_backend()._get_write_engine().url) - # dump clean table, schema, and data. - # quote all for blank values. - cmd = subprocess.Popen( - ['pg_dump', datastore_uri, '--clean', '--if-exists', '--disable-triggers', - '--no-owner', '--quote-all-identifiers', '-t', resource_id], - stdout=subprocess.PIPE) - def stream_sql(process): - for c in iter(lambda: process.stdout.read(-1), b""): - yield c - return stream_sql(cmd) + def stream_sql(process: subprocess.CompletedProcess) -> Any: + chunker = partial(BytesIO(process.stdout).read, DEFAULT_BUFFER_SIZE) + yield from iter(chunker, b"") + return stream_sql(DatastoreBackend.get_active_backend().dump_sql( + id=resource_id)) else: assert False, 'Unsupported format' diff --git a/ckanext/datastore/config_declaration.yaml b/ckanext/datastore/config_declaration.yaml index 5e308037e9a..66c9dd51d4a 100644 --- a/ckanext/datastore/config_declaration.yaml +++ b/ckanext/datastore/config_declaration.yaml @@ -99,3 +99,13 @@ groups: The default method used when creating full-text search indexes. Currently it can be "gin" or "gist". Refer to PostgreSQL's documentation to understand the characteristics of each one and pick the best for your instance. + + # (canada fork only): psql dump format + # TODO: upstream contrib?? + - default: 60 + key: ckan.datastore.max_sql_dump_exec_time + example: 300 + type: int + description: > + The maximum execution time for a subprocess.run SQL table dump command. + This is used in the SQL dump of a datastore table.