diff --git a/Makefile b/Makefile index 25f160e..e979409 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,7 @@ upload-package: @${VENV_BIN_PATH}/twine upload dist/* tag-release-candidate: + @git fetch origin @echo "The latest tag is:'$(shell git tag | sort -V | tail -1)'." \ &&echo "Please, provide a new tag (format vX.Y.Z-rc.X)):" \ &&read -p "> " tag \ diff --git a/dynamicio/mixins/with_postgres.py b/dynamicio/mixins/with_postgres.py index 1bc7c3a..8f41318 100644 --- a/dynamicio/mixins/with_postgres.py +++ b/dynamicio/mixins/with_postgres.py @@ -2,8 +2,6 @@ """This module provides mixins that are providing Postgres I/O support.""" -import csv -import tempfile from contextlib import contextmanager from typing import Any, Dict, Generator, MutableMapping, Union @@ -59,7 +57,9 @@ class WithPostgres: Args: - options: - - `truncate_and_append: bool`: If set to `True`, truncates the table and then appends the new rows. Otherwise, it drops the table and recreates it with the new rows. + -`truncate: bool`: If set to `True`, truncates the table before writing + -`append: bool`: If set to `True`, appends the new rows to the table. Else replace the table. + - `truncate_and_append: bool`: Shorthand for both truncate and append """ sources_config: PostgresDataEnvironment @@ -161,36 +161,31 @@ def _write_to_postgres(self, df: pd.DataFrame): assert self.sources_config.dynamicio_schema is not None, "The schema must be specified for SQL tables" model = self._generate_model_from_schema(self.sources_config.dynamicio_schema) + # Legacy option is_truncate_and_append = self.options.get("truncate_and_append", False) + is_append = self.options.get("append", False) or is_truncate_and_append + is_truncate = self.options.get("truncate", False) or is_truncate_and_append logger.info(f"[postgres] Started uploading table: {self.sources_config.dynamicio_schema.name} from: {db_host}:{db_name}") with session_for(connection_string) as session: - self._write_to_database(session, model.__tablename__, df, is_truncate_and_append) # type: ignore + self._write_to_database(session, model.__tablename__, df, is_append, is_truncate) # type: ignore @staticmethod - def _write_to_database(session: SqlAlchemySession, table_name: str, df: pd.DataFrame, is_truncate_and_append: bool): + def _write_to_database(session: SqlAlchemySession, table_name: str, df: pd.DataFrame, is_append: bool, is_truncate: bool): """Write a dataframe to any database provided a session with a data model and a table name. Args: - session: Generated from a data model and a table name - table_name: The name of the table to read from a DB + session: Active DB session + table_name: The name of the table to write to df: The dataframe to be written out - is_truncate_and_append: Supply to truncate the table and append new rows to it; otherwise, delete and replace + is_truncate: Supply to truncate the table before writing; + is_append: Supply to append new rows to the table; otherwise, delete and replace """ - if is_truncate_and_append: + if is_truncate: session.execute(f"TRUNCATE TABLE {table_name};") - # Below is a speedup hack in place of `df.to_csv` with the multipart option. As of today, even with - # `method="multi"`, uploading to Postgres is painfully slow. Hence, we're resorting to dumping the file as - # csv and using Postgres's CSV import function. - # https://stackoverflow.com/questions/2987433/how-to-import-csv-file-data-into-a-postgresql-table - with tempfile.NamedTemporaryFile(mode="r+") as temp_file: - df.to_csv(temp_file, index=False, header=False, sep="\t", doublequote=False, escapechar="\\", quoting=csv.QUOTE_NONE) - temp_file.flush() - temp_file.seek(0) - - cur = session.connection().connection.cursor() - cur.copy_from(temp_file, table_name, columns=df.columns, null="") + if is_append: + df.to_sql(name=table_name, con=session.get_bind(), if_exists="append", index=False) else: df.to_sql(name=table_name, con=session.get_bind(), if_exists="replace", index=False) diff --git a/requirements.txt b/requirements.txt index c57725b..284792e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ logzero>=1.7.0 magic-logger>=1.0.2 no_implicit_optional==1.4.0 pandas>=1.2.4 +numpy<2 psycopg2-binary~=2.9.3 pyarrow>=7.0.0 pydantic>=1.9.2,<3 diff --git a/tests/resources/definitions/processed.yaml b/tests/resources/definitions/processed.yaml index 7acda24..f709a07 100644 --- a/tests/resources/definitions/processed.yaml +++ b/tests/resources/definitions/processed.yaml @@ -95,3 +95,5 @@ WRITE_TO_PG_PARQUET: db_name: "[[ DB_NAME ]]" db_user: "[[ DB_USER ]]" db_password: "[[ DB_PASS ]]" + schema: + file_path: "[[ TEST_RESOURCES ]]/schemas/pg.yaml" diff --git a/tests/test_core.py b/tests/test_core.py index fecc5a4..c52e52b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -465,7 +465,7 @@ def test_schema_metrics_are_not_logged_on_write_if_metrics_flag_is_false(self, m CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), ( - pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]), + pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]), "bool", CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), @@ -516,7 +516,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), ( - pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]), + pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]), "bool", CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), @@ -578,7 +578,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), ( - pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]), + pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]), "bool", CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), @@ -640,7 +640,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), ( - pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]), + pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]), "bool", CASTING_WARNING_MSG.format("bool_col", "bool", "object"), ), diff --git a/tests/test_mixins/test_postgres_mixins.py b/tests/test_mixins/test_postgres_mixins.py index 62447db..8e21319 100644 --- a/tests/test_mixins/test_postgres_mixins.py +++ b/tests/test_mixins/test_postgres_mixins.py @@ -212,3 +212,66 @@ def test_to_check_if_dataframe_has_valid_data_types(self): # Then assert is_valid is True + + @staticmethod + def _test_append_truncate_options(mock__write_to_database, test_df, options): + # Given + df = test_df + postgres_cloud_config = IOConfig( + path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")), + env_identifier="CLOUD", + dynamic_vars=constants, + ).get( + source_key="WRITE_TO_PG_PARQUET", + ) + + # When + write_config = WritePostgresIO(source_config=postgres_cloud_config, **options) + write_config.write(df) + + # Then + mock__write_to_database.assert_called_once() + (_, _, df, is_append, is_truncate) = mock__write_to_database.call_args[0] + pd.testing.assert_frame_equal(test_df, df) + return is_append, is_truncate, write_config.options + + @pytest.mark.unit + @patch.object(WithPostgres, "_write_to_database") + def test_write_to_postgres_default_options(self, mock__write_to_database, test_df): + (append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {}) + assert append is False + assert truncate is False + assert options == {} + + @pytest.mark.unit + @patch.object(WithPostgres, "_write_to_database") + def test_write_to_postgres_legacy_append_and_truncate_option_overrules(self, mock__write_to_database, test_df): + (append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"truncate_and_append": True, "append": False, "truncate": False}) + assert append is True + assert truncate is True + assert "truncate_and_append" in options + assert "append" in options + + @pytest.mark.unit + @patch.object(WithPostgres, "_write_to_database") + def test_write_to_postgres_truncate_option(self, mock__write_to_database, test_df): + (append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"truncate": True}) + assert append is False + assert truncate is True + assert "truncate" in options + + @pytest.mark.unit + @patch.object(WithPostgres, "_write_to_database") + def test_write_to_postgres_append_option(self, mock__write_to_database, test_df): + (append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"append": True}) + assert append is True + assert truncate is False + assert "append" in options + + @pytest.mark.unit + @patch.object(WithPostgres, "_write_to_database") + def test_write_to_postgres_append_and_truncate_options(self, mock__write_to_database, test_df): + (append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"append": True, "truncate": True}) + assert append is True + assert truncate is True + assert "append" in options