Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ upload-package:
@${VENV_BIN_PATH}/twine upload dist/*

tag-release-candidate:
@git fetch origin
@echo "The latest tag is:'$(shell git tag | sort -V | tail -1)'." \
&&echo "Please, provide a new tag (format vX.Y.Z-rc.X)):" \
&&read -p "> " tag \
Expand Down
35 changes: 15 additions & 20 deletions dynamicio/mixins/with_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

"""This module provides mixins that are providing Postgres I/O support."""

import csv
import tempfile
from contextlib import contextmanager
from typing import Any, Dict, Generator, MutableMapping, Union

Expand Down Expand Up @@ -59,7 +57,9 @@ class WithPostgres:

Args:
- options:
- `truncate_and_append: bool`: If set to `True`, truncates the table and then appends the new rows. Otherwise, it drops the table and recreates it with the new rows.
-`truncate: bool`: If set to `True`, truncates the table before writing
-`append: bool`: If set to `True`, appends the new rows to the table. Else replace the table.
- `truncate_and_append: bool`: Shorthand for both truncate and append
"""

sources_config: PostgresDataEnvironment
Expand Down Expand Up @@ -161,36 +161,31 @@ def _write_to_postgres(self, df: pd.DataFrame):
assert self.sources_config.dynamicio_schema is not None, "The schema must be specified for SQL tables"
model = self._generate_model_from_schema(self.sources_config.dynamicio_schema)

# Legacy option
is_truncate_and_append = self.options.get("truncate_and_append", False)
is_append = self.options.get("append", False) or is_truncate_and_append
is_truncate = self.options.get("truncate", False) or is_truncate_and_append

logger.info(f"[postgres] Started uploading table: {self.sources_config.dynamicio_schema.name} from: {db_host}:{db_name}")
with session_for(connection_string) as session:
self._write_to_database(session, model.__tablename__, df, is_truncate_and_append) # type: ignore
self._write_to_database(session, model.__tablename__, df, is_append, is_truncate) # type: ignore

@staticmethod
def _write_to_database(session: SqlAlchemySession, table_name: str, df: pd.DataFrame, is_truncate_and_append: bool):
def _write_to_database(session: SqlAlchemySession, table_name: str, df: pd.DataFrame, is_append: bool, is_truncate: bool):
"""Write a dataframe to any database provided a session with a data model and a table name.

Args:
session: Generated from a data model and a table name
table_name: The name of the table to read from a DB
session: Active DB session
table_name: The name of the table to write to
df: The dataframe to be written out
is_truncate_and_append: Supply to truncate the table and append new rows to it; otherwise, delete and replace
is_truncate: Supply to truncate the table before writing;
is_append: Supply to append new rows to the table; otherwise, delete and replace
"""
if is_truncate_and_append:
if is_truncate:
session.execute(f"TRUNCATE TABLE {table_name};")

# Below is a speedup hack in place of `df.to_csv` with the multipart option. As of today, even with
# `method="multi"`, uploading to Postgres is painfully slow. Hence, we're resorting to dumping the file as
# csv and using Postgres's CSV import function.
# https://stackoverflow.com/questions/2987433/how-to-import-csv-file-data-into-a-postgresql-table
with tempfile.NamedTemporaryFile(mode="r+") as temp_file:
df.to_csv(temp_file, index=False, header=False, sep="\t", doublequote=False, escapechar="\\", quoting=csv.QUOTE_NONE)
temp_file.flush()
temp_file.seek(0)

cur = session.connection().connection.cursor()
cur.copy_from(temp_file, table_name, columns=df.columns, null="")
if is_append:
df.to_sql(name=table_name, con=session.get_bind(), if_exists="append", index=False)
else:
df.to_sql(name=table_name, con=session.get_bind(), if_exists="replace", index=False)

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ logzero>=1.7.0
magic-logger>=1.0.2
no_implicit_optional==1.4.0
pandas>=1.2.4
numpy<2
psycopg2-binary~=2.9.3
pyarrow>=7.0.0
pydantic>=1.9.2,<3
Expand Down
2 changes: 2 additions & 0 deletions tests/resources/definitions/processed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ WRITE_TO_PG_PARQUET:
db_name: "[[ DB_NAME ]]"
db_user: "[[ DB_USER ]]"
db_password: "[[ DB_PASS ]]"
schema:
file_path: "[[ TEST_RESOURCES ]]/schemas/pg.yaml"
8 changes: 4 additions & 4 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def test_schema_metrics_are_not_logged_on_write_if_metrics_flag_is_false(self, m
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
(
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]),
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]),
"bool",
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
Expand Down Expand Up @@ -516,7 +516,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
(
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]),
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]),
"bool",
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
Expand Down Expand Up @@ -578,7 +578,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
(
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]),
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]),
"bool",
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
Expand Down Expand Up @@ -640,7 +640,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
(
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.NAN}]),
pd.DataFrame.from_records([{"id": 1, "foo_name": "A", "bar": 12, "bool_col": True}, {"id": 2, "foo_name": "B", "bar": 12, "bool_col": np.nan}]),
"bool",
CASTING_WARNING_MSG.format("bool_col", "bool", "object"),
),
Expand Down
63 changes: 63 additions & 0 deletions tests/test_mixins/test_postgres_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,66 @@ def test_to_check_if_dataframe_has_valid_data_types(self):

# Then
assert is_valid is True

@staticmethod
def _test_append_truncate_options(mock__write_to_database, test_df, options):
# Given
df = test_df
postgres_cloud_config = IOConfig(
path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "definitions/processed.yaml")),
env_identifier="CLOUD",
dynamic_vars=constants,
).get(
source_key="WRITE_TO_PG_PARQUET",
)

# When
write_config = WritePostgresIO(source_config=postgres_cloud_config, **options)
write_config.write(df)

# Then
mock__write_to_database.assert_called_once()
(_, _, df, is_append, is_truncate) = mock__write_to_database.call_args[0]
pd.testing.assert_frame_equal(test_df, df)
return is_append, is_truncate, write_config.options

@pytest.mark.unit
@patch.object(WithPostgres, "_write_to_database")
def test_write_to_postgres_default_options(self, mock__write_to_database, test_df):
(append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {})
assert append is False
assert truncate is False
assert options == {}

@pytest.mark.unit
@patch.object(WithPostgres, "_write_to_database")
def test_write_to_postgres_legacy_append_and_truncate_option_overrules(self, mock__write_to_database, test_df):
(append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"truncate_and_append": True, "append": False, "truncate": False})
assert append is True
assert truncate is True
assert "truncate_and_append" in options
assert "append" in options

@pytest.mark.unit
@patch.object(WithPostgres, "_write_to_database")
def test_write_to_postgres_truncate_option(self, mock__write_to_database, test_df):
(append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"truncate": True})
assert append is False
assert truncate is True
assert "truncate" in options

@pytest.mark.unit
@patch.object(WithPostgres, "_write_to_database")
def test_write_to_postgres_append_option(self, mock__write_to_database, test_df):
(append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"append": True})
assert append is True
assert truncate is False
assert "append" in options

@pytest.mark.unit
@patch.object(WithPostgres, "_write_to_database")
def test_write_to_postgres_append_and_truncate_options(self, mock__write_to_database, test_df):
(append, truncate, options) = self._test_append_truncate_options(mock__write_to_database, test_df, {"append": True, "truncate": True})
assert append is True
assert truncate is True
assert "append" in options