diff --git a/README.md b/README.md index 264cca2..fb74fda 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,11 @@ python -m demo.getting_started ## How to implement new metrics -To extend the Metis framework and add new data quality metrics, please check our interface for easy integration. +To extend the Metis framework and add new data quality metrics, please check our interface for easy integration. ````python -def assess(self, - data: pd.DataFrame, - reference: Union[pd.DataFrame, None] = None, +def assess(self, + data: pd.DataFrame, + reference: Union[pd.DataFrame, None] = None, metric_config: Union[str, None] = None) -> List[DQResult]: ```` Each metric should be a subclass of ```metis.metric.metric.Metric``` and implement the assess method. This method takes three arguments: @@ -29,14 +29,17 @@ The metric should return a list of ```metis.utils.result.DQResult```. This can b ### Metric naming convention -Metrics are organized by dimension (e.g., `completeness`, `minimality`), where one folder exists for each. +Metrics are organized by dimension (e.g., `completeness`, `minimality`), where one folder exists for each. New metrics should follow the naming format: `{DimensionName}_{Technique}` - **DimensionName**: The quality dimension being measured (e.g., `Completeness`, `Minimality`) - **Technique**: The calculation or method used (e.g., `NullRatio`, `DuplicateCount`) -- **Granularity**: The level of analysis (e.g., `cell`, `row`, `column`, `table`) should be passed as a parameter through the metric config file if the metric can be applied at different granularity levels. -Examples: `completeness_NullRatio`, `minimality_DuplicateCount` +Examples: `completeness_nullRatio`, `minimality_duplicateCount` + +The file name and class name of each metric should be equal. If a metric has a specific config class, the name of the config class should be `{MetricName}_config` (e.g., `completeness_missingRatio_config`). + +- **Granularity**: The level of analysis (e.g., `cell`, `row`, `column`, `table`) should be passed as a parameter through the metric config file if the metric can be applied at different granularity levels. ## Output: creating a DQResult @@ -45,27 +48,35 @@ class DQResult: def __init__( self, timestamp: pd.Timestamp, - DQvalue: float, - DQdimension: str, + DQdimension: DQDimension, DQmetric: str, + DQgranularity: str, + DQvalue: float, + DQexplanation: Union[dict, None] = None, + runtime: Union[float, None] = None, + tableName: Union[str, None] = None, columnNames: Union[List[str], None] = None, rowIndex: Union[int, None] = None, - DQannotations: Union[dict, None] = None, + experimentTag: Union[str, None] = None, dataset: Union[str, None] = None, - tableName: Union[str, None] = None, + configJson: Union[dict, None] = None, ): ```` To create a new instance of DQResult, one needs to provide at least the following arguments: - **timestamp: pd.Timestamp**: The time at which a result was assessed. -- **DQvalue: float**: The result of the assessment. This currently only supports quantitative assessments. -- **DQdimension: str**: The name of the data quality dimension that was assessed e.g. completeness, accuracy, etc. -- **DQmetric: str**: The name of the specific metric inside the given dimension that was assessed. - -Furthermore, there are more optional arguments that might need to be set depending on the nature of different metrics. ```dataset``` and ```tableName``` are automatically set by the ```metis.dq_orchestrator.DQOrchestrator``` class which controles the data quality assessment and takes care of calling the individual metrics and storing the results. -- **columnNames: Optional[List[str]]**: List of column names associated with the assessed result. For example for column level completeness, this would be a list with a single column name, for table level completeness this would be empty since the result is valid for the whole table. -- **rowIndex: Optional[int]**: Index of the row this result is associated with. This can either be used together with columnNames to assess data quality on a cell level or for row based metrics. -- **DQannotations: Optional[dict]**: To allow metrics to save additional information or annotations, this dictionary can store all additional information that might need to be saved. This currently does not need for follow a predefined structure. +- **DQdimension: DQDimension**: Data quality dimension assessed (e.g. `DQDimension.COMPLETENESS`, `DQDimension.ACCURACY`). +- **DQmetric: str**: Name of the specific metric within the dimension. +- **DQgranularity: str**: Granularity of the metric (e.g. 'column', 'table', 'cell', 'row'). +- **DQvalue: float**: Numeric outcome of the assessment. This currently only supports quantitative assessments. + +Furthermore, there are more optional arguments that might need to be set depending on the nature of different metrics. ```dataset``` and ```tableName``` are automatically set by the ```metis.dq_orchestrator.DQOrchestrator``` class which controls the data quality assessment and takes care of calling the individual metrics and storing the results. +- **DQexplanation: Optional[dict]**: Arbitrary additional information produced by the metric (no fixed schema required). +- **runtime: Optional[float]**: Time taken to compute the metric, in seconds. +- **columnNames: Optional[List[str]]**: Columns that this result pertains to. For a column-level metric this is typically a single-item list; for a table-level metric this may be `None` or an empty list. +- **rowIndex: Optional[int]**: Row index associated with the result. Use together with `columnNames` to represent a cell-level result, or for row-based metrics. +- **experimentTag: Optional[str]**: Tag to identify a specific run. +- **configJson: Optional[dict]**: Configuration used for the metric as a JSON object. ## Data Profiling diff --git a/demo/getting_started.py b/demo/getting_started.py index 34b106f..244c74b 100644 --- a/demo/getting_started.py +++ b/demo/getting_started.py @@ -5,6 +5,9 @@ orchestrator.load(data_loader_configs=["data/adult.json"]) -orchestrator.assess(metrics=["completeness_nullRatio"], metric_configs=['{"measure_runtime": true}']) +orchestrator.assess(metrics=["completeness_nullRatio"], metric_configs=[""]) orchestrator.assess(metrics=["minimality_duplicateCount"], metric_configs=[None]) -orchestrator.assess(metrics=["validity_outOfVocabulary"], metric_configs=['{"use_nltk": true, "lowercase": true}']) \ No newline at end of file +orchestrator.assess( + metrics=["validity_outOfVocabulary"], + metric_configs=['{"use_nltk": true, "lowercase": true}'], +) diff --git a/docker_compose.yaml b/docker_compose.yaml new file mode 100644 index 0000000..c36b44b --- /dev/null +++ b/docker_compose.yaml @@ -0,0 +1,14 @@ +services: + db: + image: postgres:18 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: metis_db + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql + +volumes: + pgdata: diff --git a/metis/__init__.py b/metis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metis/database.py b/metis/database.py new file mode 100644 index 0000000..d78a0e6 --- /dev/null +++ b/metis/database.py @@ -0,0 +1,71 @@ +from typing import Dict, Literal + +from sqlalchemy import create_engine + +from metis.database_models import register_models + + +class Database: + """Provides a singleton reference for the database connection and models. Can be used by different modules to access the database without risking conflicts caused by multiple bases or engines.""" + + _instance: Database | None = None + + def __init__(self, db_type: Literal["sqlite", "postgres"], db_config: Dict): + if Database._instance is not None: + raise RuntimeError( + "Database has already been initialized. Use Database.get_instance() to access the singleton." + ) + + self.engine = self.create_engine(db_type, db_config) + + Base, self.DQResultModel, self.DataProfile = register_models( + db_config.get("table_name", "dq_results") + ) + Base.metadata.create_all(self.engine) + + Database._instance = self + + @classmethod + def get_instance(cls) -> Database: + """Return the current singleton. Raises if not initialized.""" + if cls._instance is None: + raise RuntimeError( + "Database has not been initialized. " + "Call Database.initialize(engine) first." + ) + return cls._instance + + @classmethod + def is_initialized(cls) -> bool: + return cls._instance is not None + + def create_engine(self, db_type: Literal["sqlite", "postgres"], db_config: Dict): + if db_type == "sqlite": + return self.create_sqlite_engine(db_config) + elif db_type == "postgres": + return self.create_postgres_engine(db_config) + raise ValueError(f"Unsupported database type: {db_type}") + + def create_sqlite_engine(self, db_config: Dict): + required_keys = ("db_name",) + if not all(k in db_config for k in required_keys): + raise ValueError( + f"SQLite database config must include the following fields: {required_keys}." + ) + + return create_engine( + f"sqlite:///{db_config['db_name']}", + echo=db_config.get("echo", False), + ) + + def create_postgres_engine(self, db_config: Dict): + required_keys = ("db_user", "db_pass", "db_name", "db_host", "db_port") + if not all(k in db_config for k in required_keys): + raise ValueError( + f"Postgres database config must include the following fields: {required_keys}." + ) + + return create_engine( + f"postgresql://{db_config['db_user']}:{db_config['db_pass']}@{db_config['db_host']}:{db_config['db_port']}/{db_config['db_name']}", + echo=db_config.get("echo", False), + ) diff --git a/metis/database_models.py b/metis/database_models.py index ed29803..04a5495 100644 --- a/metis/database_models.py +++ b/metis/database_models.py @@ -5,16 +5,20 @@ from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column -class Base(DeclarativeBase): - pass - def register_models(results_table_name: str): + """Register the SQLAlchemy models for the database tables based on initial configuration. Every call creates a new SQLAlchemy base, which is not bound to any engine yet. Use the Database singleton for stable references to the models and engine.""" + + class Base(DeclarativeBase): + pass + class DQResultModel(Base): __tablename__ = results_table_name __table_args__ = {"extend_existing": True} id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) - timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + timestamp: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) dq_dimension: Mapped[str] dq_metric: Mapped[str] dq_granularity: Mapped[str] @@ -28,41 +32,43 @@ class DQResultModel(Base): dataset: Mapped[str | None] config_json: Mapped[dict | None] = mapped_column(JSON) - return DQResultModel + class DataProfile(Base): + """Stores data profiling results for caching and manual imports. -class DataProfile(Base): - """Stores data profiling results for caching and manual imports. + Covers single-column statistics (null_count, distinct_count, histograms, ...), + multi-column dependencies (FDs, UCCs, INDs, ...), and any other profiling + result type. The result payload is stored as JSON so the schema stays + flexible across different task types. + """ - Covers single-column statistics (null_count, distinct_count, histograms, ...), - multi-column dependencies (FDs, UCCs, INDs, ...), and any other profiling - result type. The result payload is stored as JSON so the schema stays - flexible across different task types. - """ + __tablename__ = "data_profiles" + __table_args__ = {"extend_existing": True} - __tablename__ = "data_profiles" - __table_args__ = {"extend_existing": True} + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + timestamp: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) - id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) - timestamp: Mapped[datetime] = mapped_column( - DateTime(timezone=True), server_default=func.now() - ) + # --- identifiers --- + dataset: Mapped[str] + table_name: Mapped[str] + column_names: Mapped[List[str]] = mapped_column(JSON) + dp_task_name: Mapped[str] # e.g. "null_count", "fd", "ucc" + task_config: Mapped[dict | None] = mapped_column(JSON) # extra params used - # --- identifiers --- - dataset: Mapped[str] - table_name: Mapped[str] - column_names: Mapped[List[str]] = mapped_column(JSON) - dp_task_name: Mapped[str] # e.g. "null_count", "fd", "ucc" - task_config: Mapped[dict | None] = mapped_column(JSON) # extra params used + # --- category --- + profile_type: Mapped[str] = mapped_column(default="single_column") + # "single_column" | "multi_column" | "dependency" | "custom" - # --- category --- - profile_type: Mapped[str] = mapped_column(default="single_column") - # "single_column" | "multi_column" | "dependency" | "custom" + # --- result --- + dp_result_value: Mapped[dict | None] = mapped_column( + JSON + ) # {"v": } + result_type: Mapped[str] = mapped_column(default="scalar") + # "scalar" | "list" | "dict" | "series" — for deserialization hint - # --- result --- - dp_result_value: Mapped[dict | None] = mapped_column(JSON) # {"v": } - result_type: Mapped[str] = mapped_column(default="scalar") - # "scalar" | "list" | "dict" | "series" — for deserialization hint + # --- provenance --- + source: Mapped[str] = mapped_column(default="computed") + # "computed" | "imported:hyfd" | "imported:manual" | … - # --- provenance --- - source: Mapped[str] = mapped_column(default="computed") - # "computed" | "imported:hyfd" | "imported:manual" | … + return Base, DQResultModel, DataProfile diff --git a/metis/dq_orchestrator.py b/metis/dq_orchestrator.py index 8f8b530..ba51794 100644 --- a/metis/dq_orchestrator.py +++ b/metis/dq_orchestrator.py @@ -1,18 +1,24 @@ import json +import time +import traceback from typing import Dict, List, Type import pandas as pd -import time +from metis.database import Database from metis.loader.csv_loader import CSVLoader from metis.metric import Metric +from metis.metric.config import MetricConfig from metis.profiling.data_profile_manager import DataProfileManager from metis.profiling.importers import get_importer from metis.utils.data_config import DataConfig +from metis.utils.logging import logger from metis.utils.result import DQResult from metis.writer.console_writer import ConsoleWriter -from metis.writer.postgres_writer import PostgresWriter -from metis.writer.sqlite_writer import SQLiteWriter +from metis.writer.csv_writer import CSVWriter +from metis.writer.database_writer import DatabaseWriter + +FALLBACK_RESULTS_FILE = "dq_results_fallback.csv" class DQOrchestrator: @@ -28,17 +34,20 @@ def __init__(self, writer_config_path: str | None = None) -> None: if writer_config_path: with open(writer_config_path, "r") as f: writer_config = json.load(f) - if not "writer_name" in writer_config: + if "writer_name" not in writer_config: raise ValueError("Writer config must include 'writer_name' field.") - if writer_config["writer_name"] == "sqlite": - self.writer = SQLiteWriter(writer_config) - elif writer_config["writer_name"] == "postgres": - self.writer = PostgresWriter(writer_config) - # Initialize profile cache using the same DB as the writer. - # No caching if no DB writer is configured. - if hasattr(self.writer, "engine"): - DataProfileManager.initialize(self.writer.engine) + if writer_config["writer_name"] in ("sqlite", "postgres"): + # Create central DB instance + db = Database(writer_config["writer_name"], writer_config) + self.writer = DatabaseWriter(db) + elif writer_config["writer_name"] == "csv": + self.writer = CSVWriter(writer_config) + + # Initialize profile cache using the central DB instance. + # No caching if no DB is configured. + if Database.is_initialized(): + DataProfileManager.initialize(Database.get_instance()) def load(self, data_loader_configs: List[str]) -> None: for config_path in data_loader_configs: @@ -68,7 +77,9 @@ def load(self, data_loader_configs: List[str]) -> None: f"Unsupported loader type: {config_data.get('loader', None)}" ) - def assess(self, metrics: List[str], metric_configs: List[str | None]) -> None: + def assess( + self, metrics: List[str], metric_configs: List[str | MetricConfig | None] + ) -> None: results = [] for metric, metric_config in zip(metrics, metric_configs): @@ -104,15 +115,31 @@ def assess(self, metrics: List[str], metric_configs: List[str | None]) -> None: result.dataset = self.data_paths[df_name] results.append(result) - self.writer.write(results) + try: + logger.info( + f"Writing {len(results)} results using {self.writer.__class__.__name__}" + ) + self.writer.write(results) + except Exception as e: + traceback.print_exc() + logger.error(f"Error writing results: {e}") + try: + logger.warning("Trying to save results to csv as fallback...") + CSVWriter({"path": FALLBACK_RESULTS_FILE}).write(results) + except Exception as e: + logger.error(f"Failed to save results to csv: {e}") + raise e def get_dq_result(self, query: str) -> List[DQResult]: return [] - def _should_measure_runtime(self, metric_config: str | None) -> bool: + def _should_measure_runtime(self, metric_config: MetricConfig | str | None) -> bool: if metric_config is None: return False + if isinstance(metric_config, MetricConfig): + return getattr(metric_config, "measure_runtime", False) + try: parsed = json.loads(metric_config) except Exception: @@ -127,9 +154,7 @@ def _should_measure_runtime(self, metric_config: str | None) -> bool: return bool(parsed.get("measure_runtime", False)) - def _import_data_profiles( - self, profiles: dict, dataset: str, table: str - ) -> None: + def _import_data_profiles(self, profiles: dict, dataset: str, table: str) -> None: """Import pre-computed data profiles from config. Args: @@ -145,10 +170,6 @@ def _import_data_profiles( for task_name, task_config in profiles.items(): try: importer = get_importer(task_name) - count = importer.import_to_manager( - task_config, manager, dataset, table - ) + count = importer.import_to_manager(task_config, manager, dataset, table) except KeyError as e: - raise ValueError( - f"Unknown data profile task: {task_name}" - ) from e + raise ValueError(f"Unknown data profile task: {task_name}") from e diff --git a/metis/metric/__init__.py b/metis/metric/__init__.py index 0799384..a6bf785 100644 --- a/metis/metric/__init__.py +++ b/metis/metric/__init__.py @@ -1,5 +1,10 @@ -from .metric import Metric +from .completeness.completeness_nullAndDMVRatio import completeness_nullAndDMVRatio from .completeness.completeness_nullRatio import completeness_nullRatio from .consistency.consistency_countFDViolations import consistency_countFDViolations +from .consistency.consistency_ruleBasedHinrichs import consistency_ruleBasedHinrichs +from .consistency.consistency_ruleBasedPipino import consistency_ruleBasedPipino +from .correctness.correctness_heinrich import correctness_heinrich +from .metric import Metric from .minimality.minimality_duplicateCount import minimality_duplicateCount -from .validity.validity_outOfVocabulary import validity_outOfVocabulary \ No newline at end of file +from .timeliness.timeliness_heinrich import timeliness_heinrich +from .validity.validity_outOfVocabulary import validity_outOfVocabulary diff --git a/metis/metric/completeness/completeness_nullAndDMVRatio.py b/metis/metric/completeness/completeness_nullAndDMVRatio.py new file mode 100644 index 0000000..c3e39cc --- /dev/null +++ b/metis/metric/completeness/completeness_nullAndDMVRatio.py @@ -0,0 +1,145 @@ +from typing import List, Literal + +import pandas as pd + +from metis.metric.completeness.completeness_nullAndDMVRatio_config import ( + completeness_nullAndDMVRatio_config, +) +from metis.metric.config import MetricConfig +from metis.metric.metric import Metric +from metis.utils.disguised_missing_values.fahes.fahes import ( + FAHES_PRECISION, + FAHES_RECALL, + run_fahes, +) +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.result import DQResult + +IS_VALID_MARKER = 0 +IS_NULL_MARKER = 1 +IS_DMV_MARKER = 2 + + +class completeness_nullAndDMVRatio(Metric): + def assess( + self, + data: pd.DataFrame, + reference: pd.DataFrame | None = None, + metric_config: str | MetricConfig | None = None, + ) -> List[DQResult]: + """ + Assess the completeness of the data by checking for null values and disguised missing values. To detect disguised missing values, the FAHES algorithm by Qahtan et al. is applied to the data (paper: https://doi.org/10.1145/3219819.3220109). The completeness quality measurement is calculated as the ratio of valid values (non-null and non-disguised missing) to the total number of values. The metric can be configured using `completeness_nullAndDMVRatio_config` to calculate the completeness on column, row level, or table-level granularity. + + :param data: DataFrame to assess. + :param reference: Optional reference DataFrame (not used in this metric). + :param metric_config: Optional configuration for the metric. + :return: List of DQResult objects containing completeness results. + """ + + config = self.load_config(metric_config, completeness_nullAndDMVRatio_config) + + dmvs = run_fahes(data) + self.logger.info(f"Detected DMVs:\n{dmvs}") + + marked_cells = pd.DataFrame( + IS_VALID_MARKER, index=data.index, columns=data.columns + ) + marked_cells[data.isna()] = IS_NULL_MARKER + if dmvs is not None: + for _, dmv_row in dmvs.iterrows(): + col = dmv_row["Attribute Name"] + val = dmv_row["DMV"] + marked_cells.loc[data[col] == val, col] = IS_DMV_MARKER + + completeness = (marked_cells == IS_VALID_MARKER).astype(int) + certainty = self.certainty(marked_cells) + + if config.aggregation_axis is not None: + mean_completeness = completeness.mean(axis=config.aggregation_axis) + mean_certainty = certainty.mean(axis=config.aggregation_axis) + + if config.aggregate_all: + table_completeness = mean_completeness.mean() + table_certainty = mean_certainty.mean() + return [ + DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=table_completeness, + DQdimension=DQDimension.COMPLETENESS, + DQmetric=self.__class__.__name__, + columnNames=data.columns.tolist(), + DQexplanation={"certainty": float(table_certainty)}, + DQgranularity=DQGranularity.TABLE, + ) + ] + + return self.create_aggregated_results( + mean_completeness, + mean_certainty, + config.aggregation_axis, + data.columns.tolist(), + ) + + return self.create_flat_results(completeness, certainty) + + def certainty(self, marks: pd.DataFrame): + # .replace with a dict sometimes throws an IndexError during pandas memory cleanup. Reason not yet identified, but using chained .replace calls seems to mitigate the issue. + return ( + marks.replace(IS_VALID_MARKER, FAHES_RECALL) + .replace(IS_NULL_MARKER, 1) + .replace(IS_DMV_MARKER, FAHES_PRECISION) + ) + + def create_aggregated_results( + self, + mean_completeness: pd.Series, + mean_certainty: pd.Series, + aggregation_axis: Literal["index", "columns"], + columns: List[str], + ) -> List[DQResult]: + results = [] + for (index, completeness), certainty in zip( + mean_completeness.items(), mean_certainty.values + ): + row_index = int(str(index)) if aggregation_axis == "columns" else None + col_names = columns if aggregation_axis == "columns" else [str(index)] + + result = DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=completeness, + DQdimension=DQDimension.COMPLETENESS, + DQmetric=self.__class__.__name__, + columnNames=col_names, + rowIndex=row_index, + DQexplanation={"certainty": float(certainty)}, + DQgranularity=( + DQGranularity.ROW + if aggregation_axis == "columns" + else DQGranularity.COLUMN + ), + ) + results.append(result) + + return results + + def create_flat_results( + self, completeness: pd.DataFrame, certainty: pd.DataFrame + ) -> List[DQResult]: + results = [] + for col in completeness.columns: + for row_index, (completeness_value, certainty_value) in enumerate( + zip(completeness[col].values, certainty[col].values) + ): + result = DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=float(completeness_value), + DQdimension=DQDimension.COMPLETENESS, + DQmetric=self.__class__.__name__, + columnNames=[col], + rowIndex=row_index, + DQexplanation={"certainty": float(certainty_value)}, + DQgranularity=DQGranularity.CELL, + ) + results.append(result) + return results diff --git a/metis/metric/completeness/completeness_nullAndDMVRatio_config.py b/metis/metric/completeness/completeness_nullAndDMVRatio_config.py new file mode 100644 index 0000000..1624307 --- /dev/null +++ b/metis/metric/completeness/completeness_nullAndDMVRatio_config.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import Literal + +from metis.metric.config import MetricConfig + +VALID_AGGREGATION_AXES = ["index", "columns", None] + + +@dataclass +class completeness_nullAndDMVRatio_config(MetricConfig): + """ + Configuration class for the completeness_nullAndDMVRatio metric. + + :param aggregation_axis: Axis along which to aggregate completeness ('index': aggregate each column; 'columns': aggregate each row, None (default): no aggregation). + :param aggregate_all: Whether to aggregate all completeness results into a single value for the whole input data. + """ + + aggregation_axis: Literal["index", "columns", None] = None + aggregate_all: bool = False + + def to_json(self): + return { + "name": self.__class__.__name__, + "aggregation_axis": self.aggregation_axis, + "aggregate_all": self.aggregate_all, + } + + def validate(self): + if self.aggregation_axis not in VALID_AGGREGATION_AXES: + raise ValueError( + f"aggregation_axis must be one of {VALID_AGGREGATION_AXES} but was {self.aggregation_axis}" + ) + if not isinstance(self.aggregate_all, bool): + raise ValueError( + f"aggregate_all must be a boolean value but was {type(self.aggregate_all)}" + ) diff --git a/metis/metric/completeness/completeness_nullRatio.py b/metis/metric/completeness/completeness_nullRatio.py index 56cdcc7..1dd64b3 100644 --- a/metis/metric/completeness/completeness_nullRatio.py +++ b/metis/metric/completeness/completeness_nullRatio.py @@ -1,33 +1,122 @@ +from typing import List, Literal + import pandas as pd -from typing import List, Union -from metis.utils.result import DQResult +from metis.metric.completeness.completeness_nullRatio_config import ( + completeness_nullRatio_config, +) +from metis.metric.config import MetricConfig from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.result import DQResult + class completeness_nullRatio(Metric): - def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, None] = None, metric_config: Union[str, None] = None) -> List[DQResult]: + def assess( + self, + data: pd.DataFrame, + reference: pd.DataFrame | None = None, + metric_config: str | MetricConfig | None = None, + ) -> List[DQResult]: """ - Assess the completeness of the data by checking for null values. - + Assess the completeness of the data by calculating the ratio and count of null values on different granularities. The ratio of non-null values is stored as the completeness quality measurement, while the count of null values is stored in the explanation for better interpretability. The metric can be configured using `completeness_nullRatio_config` to calculate the completeness on column, row level, or table-level granularity. + :param data: DataFrame to assess. + :param reference: Optional reference DataFrame (not used in this metric). :param metric_config: Optional configuration for the metric. :return: List of DQResult objects containing completeness results. """ + + config = self.load_config(metric_config, completeness_nullRatio_config) + results = [] - total_rows = len(data) - - for column in data.columns: - null_count = data[column].isnull().sum() - completeness = (total_rows - int(null_count)) / total_rows - + + na_mask = data.isna() + + completeness = (~na_mask).astype(int) + null_count = na_mask.astype(int) + + if config.aggregation_axis is not None: + mean_completeness = completeness.mean(axis=config.aggregation_axis) + mean_null_count = null_count.sum(axis=config.aggregation_axis) + + if config.aggregate_all: + table_completeness = mean_completeness.mean() + table_null_count = mean_null_count.sum() + + return [ + DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=table_completeness, + DQdimension=DQDimension.COMPLETENESS, + DQmetric=self.__class__.__name__, + columnNames=data.columns.tolist(), + DQgranularity=DQGranularity.TABLE, + DQexplanation={ + "null_count": float(table_null_count), + }, + ) + ] + + return self.create_aggregated_results( + mean_completeness, + mean_null_count, + config.aggregation_axis, + data.columns.tolist(), + ) + + return self.create_flat_results(completeness, null_count) + + def create_aggregated_results( + self, + mean_completeness: pd.Series, + mean_null_count: pd.Series, + aggregation_axis: Literal["index", "columns"], + columns: List[str], + ) -> List[DQResult]: + results = [] + for (index, completeness), null_count in zip( + mean_completeness.items(), mean_null_count.values + ): + row_index = int(str(index)) if aggregation_axis == "columns" else None + col_names = columns if aggregation_axis == "columns" else [str(index)] + result = DQResult( timestamp=pd.Timestamp.now(), - DQdimension="Completeness", - DQmetric="NullRatio", - DQgranularity="column", DQvalue=completeness, - columnNames=[column], + DQdimension=DQDimension.COMPLETENESS, + DQmetric=self.__class__.__name__, + columnNames=col_names, + rowIndex=row_index, + DQexplanation={"null_count": float(null_count)}, + DQgranularity=( + DQGranularity.ROW + if aggregation_axis == "columns" + else DQGranularity.COLUMN + ), ) results.append(result) - - return results \ No newline at end of file + + return results + + def create_flat_results( + self, completeness: pd.DataFrame, null_count: pd.DataFrame + ) -> List[DQResult]: + results = [] + for col in completeness.columns: + for row_index, (completeness_value, null_count_value) in enumerate( + zip(completeness[col].values, null_count[col].values) + ): + result = DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=float(completeness_value), + DQdimension=DQDimension.COMPLETENESS, + DQmetric=self.__class__.__name__, + columnNames=[col], + rowIndex=row_index, + DQexplanation={"null_count": float(null_count_value)}, + DQgranularity=DQGranularity.CELL, + ) + results.append(result) + return results diff --git a/metis/metric/completeness/completeness_nullRatio_config.py b/metis/metric/completeness/completeness_nullRatio_config.py new file mode 100644 index 0000000..20cc60d --- /dev/null +++ b/metis/metric/completeness/completeness_nullRatio_config.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass +from typing import Literal + +from metis.metric.config import MetricConfig + +VALID_AGGREGATION_AXES = ["index", "columns", None] + +@dataclass +class completeness_nullRatio_config(MetricConfig): + """ + Configuration class for the completeness_nullRatio metric. + + :param aggregation_axis: Axis along which to aggregate completeness ('index': aggregate each column; 'columns': aggregate each row). + :param aggregate_all: Whether to aggregate all completeness results into a single value for the whole input data. + """ + + aggregation_axis: Literal["index", "columns", None] = None + aggregate_all: bool = False + + def to_json(self): + return { + "name": self.__class__.__name__, + "aggregation_axis": self.aggregation_axis, + "aggregate_all": self.aggregate_all, + } + + def validate(self): + if self.aggregation_axis not in VALID_AGGREGATION_AXES: + raise ValueError( + f"aggregation_axis must be one of {VALID_AGGREGATION_AXES} but was {self.aggregation_axis}" + ) + if not isinstance(self.aggregate_all, bool): + raise ValueError( + f"aggregate_all must be a boolean value but was {type(self.aggregate_all)}" + ) diff --git a/metis/metric/config.py b/metis/metric/config.py new file mode 100644 index 0000000..9cdbb39 --- /dev/null +++ b/metis/metric/config.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass + + +@dataclass +class MetricConfig: + """ + Base class for metric configuration. + All metric configuration classes should inherit from this class. + """ + + @classmethod + def from_dict(cls, config_dict: dict): + """ + Create an instance of the configuration class from a dictionary. + + :param config_dict: A dictionary containing the configuration parameters. + + :return: An instance of the configuration class. + """ + return cls(**config_dict) + + def validate(self): + """ + Validate the configuration parameters. + This method should be overridden by subclasses to implement specific validation logic. + """ + pass diff --git a/metis/metric/consistency/consistency_countFDViolations.py b/metis/metric/consistency/consistency_countFDViolations.py index ec0ba06..5bae10f 100644 --- a/metis/metric/consistency/consistency_countFDViolations.py +++ b/metis/metric/consistency/consistency_countFDViolations.py @@ -1,15 +1,25 @@ -import pandas as pd -from typing import List, Union import json +from typing import List, Union -from metis.utils.result import DQResult +import pandas as pd + +from metis.metric.config import MetricConfig from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.result import DQResult + class consistency_countFDViolations(Metric): - def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, None] = None, metric_config: Union[str, None] = None) -> List[DQResult]: + def assess( + self, + data: pd.DataFrame, + reference: Union[pd.DataFrame, None] = None, + metric_config: Union[MetricConfig, str, None] = None, + ) -> List[DQResult]: """ Assess the consistency of a dataset by checking the compliance of a functional dependency specified in the metric_config. - + :param data: DataFrame to assess. :param metric_config: JSON that specifies FDs to check. :return: List of DQResult objects containing accuracy results. @@ -20,8 +30,18 @@ def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, None] = None if total_rows == 0: return results + if metric_config is None: + raise ValueError( + "Metric configuration is required for consistency assessment." + ) + if not isinstance(metric_config, str): + raise ValueError( + "Metric configuration must be a file path to a JSON configuration." + ) + with open(metric_config, "r") as f: metric_conf = json.load(f) + for determinant, dependents in metric_conf.items(): if determinant not in data.columns: continue @@ -37,18 +57,18 @@ def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, None] = None # for the same determinant (FD violation) violations = grouped[grouped > 1].index.tolist() - consistency = 1 - (len(violations) / len(data[determinant])) - - result = DQResult( - timestamp=pd.Timestamp.now(), - DQdimension="Consistency", - DQmetric="CountFDViolations", - DQgranularity="table", - DQvalue=consistency, - DQexplanation={f"{determinant}:{dependent}": violations}, # FD - columnNames=[determinant], - configJson=metric_conf - ) - results.append(result) + consistency = 1 - (len(violations) / len(data[determinant])) + + result = DQResult( + timestamp=pd.Timestamp.now(), + DQdimension=DQDimension.CONSISTENCY, + DQmetric=self.__class__.__name__, + DQgranularity=DQGranularity.TABLE, + DQvalue=consistency, + DQexplanation={f"{determinant}:{dependent}": violations}, # FD + columnNames=[determinant], + configJson=metric_conf, + ) + results.append(result) return results diff --git a/metis/metric/consistency/consistency_ruleBasedHinrichs.py b/metis/metric/consistency/consistency_ruleBasedHinrichs.py new file mode 100644 index 0000000..2c98bf2 --- /dev/null +++ b/metis/metric/consistency/consistency_ruleBasedHinrichs.py @@ -0,0 +1,115 @@ +from math import sqrt +from typing import Any, Callable, List, Union + +import pandas as pd + +from metis.metric.config import MetricConfig +from metis.metric.consistency.consistency_ruleBasedHinrichs_config import ( + consistency_ruleBasedHinrichs_config, +) +from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.logging import warn_unconfigured_columns +from metis.utils.result import DQResult + + +class consistency_ruleBasedHinrichs(Metric): + def assess( + self, + data: pd.DataFrame, + reference: Union[pd.DataFrame, None] = None, + metric_config: str | None | MetricConfig = None, + ) -> List[DQResult]: + """ + Assess the consistency of the data by checking the given rules for each value. The rules are defined in the metric configuration. There are attribute rules that apply to individual columns and tuple rules that apply to entire rows. The quality measurement is calculated as 1 / (1 + degree_of_violation), where degree_of_violation is the sum of the result of all applicable rules for a given value/row. + Additionally, this metric assesses the certainty of the measurement based on the minimum quality in the assessed data. The certainty is calculated as sqrt((1 - dq_value) * (1 - min_quality)), where dq_value is the quality measurement for the specific value/row and min_quality is the lowest quality measurement observed in the dataset. + + :param data: DataFrame to assess. + :param reference: Optional reference DataFrame (not used in this metric). + :param metric_config: Optional configuration for the metric. + :return: List of DQResult objects containing consistency results. + """ + if metric_config is None: + raise ValueError( + f"Metric configuration is required for metric {consistency_ruleBasedHinrichs.__name__} but None was provided." + ) + if isinstance(metric_config, str): + raise ValueError( + f"Metric configuration must be an instance of {consistency_ruleBasedHinrichs_config.__name__}. JSON loading is not supported." + ) + if not isinstance(metric_config, consistency_ruleBasedHinrichs_config): + raise ValueError( + f"Metric configuration must be an instance of {consistency_ruleBasedHinrichs_config.__name__} but was of type {type(metric_config)}." + ) + + attribute_rules = metric_config.attribute_rules or {} + tuple_rules = metric_config.tuple_rules or [] + + results: List[DQResult] = [] + + if tuple_rules: + degree_of_violation: pd.Series[float] = data.apply( + lambda x: self.sum_rules(tuple_rules, x), axis="columns" + ) + + dq_measurements = 1 / (1 + degree_of_violation) + min_quality = dq_measurements.min() + for row_index, dq_value in enumerate(dq_measurements.values): + results.append( + DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=dq_value, + DQdimension=DQDimension.CONSISTENCY, + DQmetric=self.__class__.__name__, + columnNames=data.columns.tolist(), + rowIndex=row_index, + DQexplanation={ + "certainty": self.certainty(dq_value, min_quality) + }, + DQgranularity=DQGranularity.ROW, + ) + ) + + warn_unconfigured_columns( + self.logger, + set(data.columns), + set(attribute_rules.keys()), + "consistency rules", + ) + + for col_name in data.columns: + column_rules = attribute_rules.get(col_name, []) + if not column_rules: + continue + + degree_of_violation: pd.Series[float] = data[col_name].apply( + lambda x: self.sum_rules(column_rules, x) + ) + + dq_measurements = 1 / (1 + degree_of_violation) + min_quality = dq_measurements.min() + + for row_index, dq_value in enumerate(dq_measurements.values): + results.append( + DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=dq_value, + DQdimension=DQDimension.CONSISTENCY, + DQmetric=self.__class__.__name__, + columnNames=[col_name], + rowIndex=row_index, + DQexplanation={ + "certainty": self.certainty(dq_value, min_quality) + }, + DQgranularity=DQGranularity.CELL, + ) + ) + + return results + + def sum_rules(self, rules: List[Callable], value: Any) -> float: + return float(sum(rule(value) for rule in rules)) + + def certainty(self, dq_value: float, min_quality: float) -> float: + return sqrt((1 - dq_value) * (1 - min_quality)) diff --git a/metis/metric/consistency/consistency_ruleBasedHinrichs_config.py b/metis/metric/consistency/consistency_ruleBasedHinrichs_config.py new file mode 100644 index 0000000..6eef08a --- /dev/null +++ b/metis/metric/consistency/consistency_ruleBasedHinrichs_config.py @@ -0,0 +1,40 @@ +import inspect +from dataclasses import dataclass +from typing import Any, Callable, Dict, List + +import pandas as pd + +from metis.metric.config import MetricConfig + + +@dataclass(kw_only=True) +class consistency_ruleBasedHinrichs_config(MetricConfig): + """ + Configuration class for the consistency_ruleBasedHinrichs metric. + + Accepts a dictionary mapping attribute names to lists of functions that define consistency rules. + :param attribute_rules: Dictionary of functions that define consistency rules for each column given by the key + :param tuple_rules: List of functions that define consistency rules for entire tuples + """ + + attribute_rules: Dict[str, List[Callable[[Any], float]]] | None = None + + tuple_rules: List[Callable[[pd.Series], float]] | None = None + + def to_json(self): + return { + "name": self.__class__.__name__, + "attribute_rules": ( + { + column: [inspect.getsource(rule).strip() for rule in rules] + for column, rules in self.attribute_rules.items() + } + if self.attribute_rules + else {} + ), + "tuple_rules": ( + [inspect.getsource(rule).strip() for rule in self.tuple_rules] + if self.tuple_rules + else [] + ), + } diff --git a/metis/metric/consistency/consistency_ruleBasedPipino.py b/metis/metric/consistency/consistency_ruleBasedPipino.py new file mode 100644 index 0000000..b557ea2 --- /dev/null +++ b/metis/metric/consistency/consistency_ruleBasedPipino.py @@ -0,0 +1,134 @@ +from typing import List, Union + +import pandas as pd + +from metis.metric.config import MetricConfig +from metis.metric.consistency.consistency_ruleBasedPipino_config import ( + consistency_ruleBasedPipino_config, +) +from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.logging import warn_unconfigured_columns +from metis.utils.result import DQResult + + +class consistency_ruleBasedPipino(Metric): + def assess( + self, + data: pd.DataFrame, + reference: Union[pd.DataFrame, None] = None, + metric_config: str | None | MetricConfig = None, + ) -> List[DQResult]: + """ + Assess the consistency of the data by checking the given rules for each value. The rules are defined in the metric configuration. There are attribute rules that apply to individual columns and tuple rules that apply to entire rows. The quality measurement is calculated as 1 - degree_of_violation / N, where degree_of_violation is the sum of the result of all applicable rules for a given value/row and N is the total number of rules. + Additionally, this metric assesses the certainty of the measurement based on the minimum quality in the assessed data. The certainty is calculated as sqrt((1 - dq_value) * (1 - min_quality)), where dq_value is the quality measurement for the specific value/row and min_quality is the lowest quality measurement observed in the dataset. + + :param data: DataFrame to assess. + :param reference: Optional reference DataFrame (not used in this metric). + :param metric_config: Mandatory configuration for the metric. + :return: List of DQResult objects containing consistency results. + """ + if metric_config is None: + raise ValueError( + f"Metric configuration is required for metric {consistency_ruleBasedPipino.__name__} but None was provided." + ) + if isinstance(metric_config, str): + raise ValueError( + f"Metric configuration must be an instance of {consistency_ruleBasedPipino_config.__name__}. JSON loading is not supported." + ) + if not isinstance(metric_config, consistency_ruleBasedPipino_config): + raise ValueError( + f"Metric configuration must be an instance of {consistency_ruleBasedPipino_config.__name__} but was of type {type(metric_config)}." + ) + + attribute_rules = metric_config.attribute_rules or {} + tuple_rules = metric_config.tuple_rules or [] + + results: List[DQResult] = [] + + if tuple_rules: + fulfilled_rules_mask = pd.DataFrame( + { + f"rule_{i}": data.apply(rule, axis="columns") + for i, rule in enumerate(tuple_rules) + } + ) + + dq_measurements = fulfilled_rules_mask.mean(axis=1) + certainties = self.certainties(fulfilled_rules_mask) + for row_index, (dq_value, certainty) in enumerate( + zip(dq_measurements.values, certainties.values) + ): + results.append( + self.create_result( + dq_value, + data.columns.tolist(), + row_index, + float(certainty), + ) + ) + + warn_unconfigured_columns( + self.logger, + set(data.columns), + set(attribute_rules.keys()), + "consistency rules", + ) + + for col_name in data.columns: + column_rules = attribute_rules.get(col_name, []) + if not column_rules: + continue + + fulfilled_rules_mask = pd.DataFrame( + { + f"rule_{i}": data[col_name].apply(rule) + for i, rule in enumerate(column_rules) + } + ) + + dq_measurements = fulfilled_rules_mask.mean(axis=1) + certainties = self.certainties(fulfilled_rules_mask) + + for row_index, (dq_value, certainty) in enumerate( + zip(dq_measurements.values, certainties.values) + ): + results.append( + self.create_result( + dq_value, + [col_name], + row_index, + float(certainty), + ) + ) + + return results + + def certainties(self, fulfilled_rules_mask: pd.DataFrame): + rule_fulfillment_percentage = fulfilled_rules_mask.mean(axis=0) + return ( + (1 - fulfilled_rules_mask - rule_fulfillment_percentage).abs().mean(axis=1) + ) + + def create_result( + self, + dq_value: float, + col_names: List[str], + row_index: int, + certainty: float, + ) -> DQResult: + return DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=dq_value, + DQdimension=DQDimension.CONSISTENCY, + DQmetric=self.__class__.__name__, + columnNames=col_names, + rowIndex=row_index, + DQexplanation={ + "certainty": certainty, + }, + DQgranularity=( + DQGranularity.CELL if len(col_names) == 1 else DQGranularity.ROW + ), + ) diff --git a/metis/metric/consistency/consistency_ruleBasedPipino_config.py b/metis/metric/consistency/consistency_ruleBasedPipino_config.py new file mode 100644 index 0000000..b930b19 --- /dev/null +++ b/metis/metric/consistency/consistency_ruleBasedPipino_config.py @@ -0,0 +1,40 @@ +import inspect +from dataclasses import dataclass +from typing import Any, Callable, Dict, List + +import pandas as pd + +from metis.metric.config import MetricConfig + + +@dataclass(kw_only=True) +class consistency_ruleBasedPipino_config(MetricConfig): + """ + Configuration class for the consistency_ruleBasedPipino metric. + + Accepts a dictionary mapping attribute names to lists of functions that define consistency rules. + :param attribute_rules: Dictionary of functions that define consistency rules for each column given by the key + :param tuple_rules: List of functions that define consistency rules for entire tuples + """ + + attribute_rules: Dict[str, List[Callable[[Any], bool]]] | None = None + + tuple_rules: List[Callable[[pd.Series], bool]] | None = None + + def to_json(self): + return { + "name": self.__class__.__name__, + "attribute_rules": ( + { + column: [inspect.getsource(rule).strip() for rule in rules] + for column, rules in self.attribute_rules.items() + } + if self.attribute_rules + else {} + ), + "tuple_rules": ( + [inspect.getsource(rule).strip() for rule in self.tuple_rules] + if self.tuple_rules + else [] + ), + } diff --git a/metis/metric/correctness/correctness_heinrich.py b/metis/metric/correctness/correctness_heinrich.py new file mode 100644 index 0000000..927ffa8 --- /dev/null +++ b/metis/metric/correctness/correctness_heinrich.py @@ -0,0 +1,82 @@ +from typing import List + +import pandas as pd + +from metis.metric.config import MetricConfig +from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.numbers import clamp +from metis.utils.result import DQResult +from metis.utils.similarity_measures.levenshtein_distance import levenshtein_distance + + +class correctness_heinrich(Metric): + def assess( + self, + data: pd.DataFrame, + reference: pd.DataFrame | None = None, + metric_config: str | MetricConfig | None = None, + ) -> List[DQResult]: + """ + Assess the correctness of the data by calculating the deviation from the reference. + + :param data: DataFrame to assess. + :param metric_config: Optional configuration for the metric. + :return: List of DQResult objects containing correctness results. + """ + if reference is None: + raise ValueError( + "Reference DataFrame is required for correctness assessment." + ) + + if data.shape != reference.shape: + raise ValueError( + f"Data and reference must have the same shape for correctness assessment. Got data shape {data.shape} and reference shape {reference.shape}." + ) + + results = [] + total_rows = len(data) + + for col_name in data.columns: + for row_index in range(total_rows): + measurement = self.measure_correctness( + data[col_name].iat[row_index], + reference_value=reference[col_name].iat[row_index], + dtype=data[col_name].dtype, + ) + + result = DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=measurement, + DQdimension=DQDimension.CORRECTNESS, + DQmetric=self.__class__.__name__, + columnNames=[col_name], + rowIndex=row_index, + DQgranularity=DQGranularity.CELL, + ) + results.append(result) + + return results + + def measure_correctness(self, value, *, reference_value, dtype) -> float: + if value == reference_value: + return 1 + if pd.isna(value) or pd.isna(reference_value): + return 0 + if pd.api.types.is_numeric_dtype(dtype): + return clamp( + 1 + - abs(value - reference_value) / max(abs(reference_value), abs(value)), + 0, + 1, + ) + if pd.api.types.is_string_dtype(dtype): + max_len = max(len(str(value)), len(str(reference_value))) + correctness = ( + 1 - levenshtein_distance(str(value), str(reference_value)) / max_len + ) + return correctness + raise ValueError( + f"Unsupported dtype for correctness measurement: {dtype} (value: {value}, reference_value: {reference_value})" + ) diff --git a/metis/metric/metric.py b/metis/metric/metric.py index e23dc62..62a4da1 100644 --- a/metis/metric/metric.py +++ b/metis/metric/metric.py @@ -1,67 +1,112 @@ +import json from abc import ABC, abstractmethod +from typing import Any, List, TypeVar + import pandas as pd -from typing import List, Union +from metis.metric.config import MetricConfig +from metis.utils.logging import logger as main_logger from metis.utils.result import DQResult +C = TypeVar("C", bound=MetricConfig) + + class Metric(ABC): """ Abstract base class for metrics. - All metric classes should inherit from this class and implement the `compute` method. + All metric classes should inherit from this class and implement the `assess` method. """ + registry = {} def __init_subclass__(cls): super().__init_subclass__() Metric.registry[cls.__name__] = cls + def __init__(self) -> None: + self.logger = main_logger.getChild(self.__class__.__name__) + @abstractmethod - def assess(self, - data: pd.DataFrame, - reference: Union[pd.DataFrame, None] = None, - metric_config: Union[str, None] = None) -> List[DQResult]: - """Assess data using this metric and return the results. - - Parameters - - data: pd.DataFrame - The DataFrame that should be assessed by this metric. This is - the primary dataset under inspection. - - - reference: Optional[pd.DataFrame] - An optional, cleaned reference DataFrame that can act as a - clean version of the dataset. Metrics that need a canonical or - expected version of the data (for example correctness against a - known-good source) should accept and use this DataFrame. If not - needed by a metric, `None` is allowed. - - - metric_config: Optional[str] - Optional path or JSON string containing metric-specific - configuration. Use this to keep the method signature compact; - all metric-specific parameters (thresholds, aggregation options, - etc.) can be stored here. - - Returns - - List[DQResult] - A list of `DQResult` objects. Each `DQResult` instance captures - one assessed value produced by the metric. For metrics that - operate at the column level, there should be one `DQResult` per - column; for table-level metrics typically a single `DQResult` - is returned. Implementations are free to return multiple - results for any logical decomposition the metric provides - (e.g., per-column, per-partition, per-check). - - Notes - - Implementations must avoid mutating the - input `data` and `reference` DataFrames in-place. - - `metric_config` should be parsed by the implementation and any - invalid config should raise a clear exception describing the - expected format. - - Examples - - Column-level completeness metric: returns one `DQResult` per - column with the fraction of non-null values. - - Correctness metric against a reference: compares `data` to - `reference` and returns one `DQResult` per cell in the input table containing the - agreement score. - """ - raise NotImplementedError() \ No newline at end of file + def assess( + self, + data: pd.DataFrame, + reference: pd.DataFrame | None = None, + metric_config: str | MetricConfig | None = None, + ) -> List[DQResult]: + """Assess data using this metric and return the results. + + Parameters + - data: pd.DataFrame + The DataFrame that should be assessed by this metric. This is + the primary dataset under inspection. + + - reference: Optional[pd.DataFrame] + An optional, cleaned reference DataFrame that can act as a + clean version of the dataset. Metrics that need a canonical or + expected version of the data (for example correctness against a + known-good source) should accept and use this DataFrame. If not + needed by a metric, `None` is allowed. + + - metric_config: Optional[str] + Optional path or JSON string containing metric-specific + configuration. Use this to keep the method signature compact; + all metric-specific parameters (thresholds, aggregation options, + etc.) can be stored here. + + Returns + - List[DQResult] + A list of `DQResult` objects. Each `DQResult` instance captures + one assessed value produced by the metric. For metrics that + operate at the column level, there should be one `DQResult` per + column; for table-level metrics typically a single `DQResult` + is returned. Implementations are free to return multiple + results for any logical decomposition the metric provides + (e.g., per-column, per-partition, per-check). + + Notes + - Implementations must avoid mutating the + input `data` and `reference` DataFrames in-place. + - `metric_config` should be parsed by the implementation and any + invalid config should raise a clear exception describing the + expected format. + + Examples + - Column-level completeness metric: returns one `DQResult` per + column with the fraction of non-null values. + - Correctness metric against a reference: compares `data` to + `reference` and returns one `DQResult` per cell in the input table containing the + agreement score. + """ + raise NotImplementedError() + + def load_config(self, config: Any, model: type[C]) -> C: + """ + Load metric-specific configuration from a JSON file path, JSON string or the correct config model instance. Also validates the configuration using its validate method. + + :param config: Path to the JSON configuration file, a JSON string or an instance of the config model. + :return: An instance of the metric-specific configuration class. + """ + if isinstance(config, model): + config.validate() + return config + + if isinstance(config, str): + try: + if config.endswith(".json"): + with open(config, "r") as f: + config_dict = json.load(f) + else: + config_dict = json.loads(config) if len(config) > 0 else {} + + parsed_config = model(**config_dict) + except Exception as e: + raise ValueError( + f"Failed to load metric configuration from {config}: {e}" + ) from e + + parsed_config.validate() + return parsed_config + + raise TypeError( + f"Invalid config type: {type(config)}. Expected str or {model}." + ) diff --git a/metis/metric/minimality/minimality_duplicateCount.py b/metis/metric/minimality/minimality_duplicateCount.py index 35ea646..1320b65 100644 --- a/metis/metric/minimality/minimality_duplicateCount.py +++ b/metis/metric/minimality/minimality_duplicateCount.py @@ -1,26 +1,36 @@ -import pandas as pd from typing import List, Union -from metis.utils.result import DQResult +import pandas as pd + +from metis.metric.config import MetricConfig from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.result import DQResult + class minimality_duplicateCount(Metric): - def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, None] = None, metric_config: Union[str, None] = None) -> List[DQResult]: + def assess( + self, + data: pd.DataFrame, + reference: Union[pd.DataFrame, None] = None, + metric_config: Union[MetricConfig, str, None] = None, + ) -> List[DQResult]: """ - Assess the minimality for each attribute of a dataset by checking for unique values. - + Assess the minimality for each attribute of a dataset by checking for unique values. + :param data: DataFrame to assess. :param metric_config: Optional configuration for the metric. :return: List of DQResult objects containing completeness results. """ results = [] total_rows = len(data) - + for column in data.columns: # Count values that appear exactly once (not duplicated) unique_count = (~data[column].duplicated(keep=False)).sum() minimality = unique_count / total_rows if total_rows > 0 else 0 - + # Attributes with 100% unique values are candidate keys annotations = {} if minimality == 1.0: @@ -28,13 +38,13 @@ def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, None] = None result = DQResult( timestamp=pd.Timestamp.now(), - DQdimension="Minimality", - DQmetric="DuplicateCount", - DQgranularity="column", + DQdimension=DQDimension.MINIMALITY, + DQmetric=self.__class__.__name__, + DQgranularity=DQGranularity.COLUMN, DQvalue=minimality, DQexplanation=annotations, columnNames=[column], ) results.append(result) - - return results \ No newline at end of file + + return results diff --git a/metis/metric/timeliness/timeliness_heinrich.py b/metis/metric/timeliness/timeliness_heinrich.py new file mode 100644 index 0000000..cf46e30 --- /dev/null +++ b/metis/metric/timeliness/timeliness_heinrich.py @@ -0,0 +1,143 @@ +from math import exp, floor +from typing import List + +import numpy as np +import pandas as pd + +from metis.metric.config import MetricConfig +from metis.metric.metric import Metric +from metis.metric.timeliness.timeliness_heinrich_config import ( + timeliness_heinrich_config, +) +from metis.utils.datetime.datetime_precision import determine_datetime_precision +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.logging import warn_unconfigured_columns +from metis.utils.result import DQResult + + +class timeliness_heinrich(Metric): + def assess( + self, + data: pd.DataFrame, + reference: pd.DataFrame | None = None, + metric_config: str | MetricConfig | None = None, + ) -> List[DQResult]: + """ + Assess the timeliness of the data by calculating how likely each cell is to be out of date based on a reference date and a decline rate. The reference date is either provided in the configuration or defaults to the current date. + The formula used is: timeliness = exp(-decline_rate * age), where age and decline_rate are measured in years. The age is calculated as the difference between the reference date and the ingestion date of the tuple (defined by the ingestion_date_column in the configuration). + + :param data: DataFrame to assess. + :param reference: Optional reference DataFrame (not used in this metric). + :param metric_config: Configuration for the metric (required). + :return: List of DQResult objects containing timeliness results. + """ + if not metric_config: + raise ValueError( + "Metric configuration is required for timeliness assessment." + ) + + config = self.load_config(metric_config, timeliness_heinrich_config) + results = [] + warn_unconfigured_columns( + self.logger, + set(data.columns), + set(config.timeliness_config_per_column.keys()), + "timeliness configuration", + ) + + for col_name, col_config in config.timeliness_config_per_column.items(): + ingestion_date_column = col_config.ingestion_date_column + assessment_date = pd.to_datetime( + col_config.simulated_assessment_date or pd.Timestamp.now() + ) + + if not ingestion_date_column or ingestion_date_column not in data.columns: + self.logger.warning( + f"Ingestion date column '{ingestion_date_column}' is not present in the data. Skipping assessment for column '{col_name}'." + ) + return results + + ingestion_dates = pd.to_datetime( + data[ingestion_date_column], **(col_config.to_datetime_kwargs or {}) + ) + ages_in_days = ( + (assessment_date - ingestion_dates).dt.total_seconds() / 60 / 60 / 24 + ) + precision_of_dates = ( + pd.Series( + [col_config.simulated_timestamp_precision] * len(data), + index=data.index, + ) + if col_config.simulated_timestamp_precision + else data[ingestion_date_column].apply(determine_datetime_precision) + ) + age_and_precision = pd.DataFrame( + {"age": ages_in_days, "precision": precision_of_dates} + ) + + decline_rate = col_config.decline_rate + timeliness = pd.Series(np.exp(-decline_rate * ages_in_days)) + certainty = age_and_precision.apply( + lambda row: self.certainty( + row["age"], + decline_rate or 0, + row["precision"], + ), + axis=1, + ) + for row_index, (timeliness_value, certainty_value) in enumerate( + zip(timeliness.values, certainty.values) + ): + result = DQResult( + timestamp=pd.Timestamp.now(), + DQvalue=timeliness_value, + DQdimension=DQDimension.TIMELINESS, + DQmetric=self.__class__.__name__, + columnNames=[col_name], + rowIndex=row_index, + DQexplanation={ + "certainty": certainty_value, + }, + DQgranularity=DQGranularity.CELL, + ) + results.append(result) + + return results + + def certainty(self, age: float, decline_rate: float, precision: str) -> float: + """ + Calculate the certainty of the timeliness measurement based on age, decline rate, and datetime precision. + + :param age: The age of the data in days. + :param decline_rate: The decline rate per day. + :param precision: The precision of the datetime ('year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'). + :return: The certainty of the measurement. + """ + lower_age_bound, upper_age_bound = self.age_precision_bounds(age, precision) + # max_quality_difference = abs(exp(-decline_rate) - 1) + unscaled_difference = abs( + exp(-decline_rate * upper_age_bound) - exp(-decline_rate * lower_age_bound) + ) + return 1 - unscaled_difference + + def age_precision_bounds(self, age: float, precision: str): + """ + Get the precision factor based on the datetime precision. + + :param precision: The precision of the datetime ('year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'). + :return: The corresponding precision factor. + """ + precision_factors = { + "year": 365.25, + "month": 30, + "day": 1, + "hour": 1.0 / 24, + "minute": 1.0 / (24 * 60), + "second": 1.0 / (24 * 60 * 60), + "microsecond": 1.0 / (24 * 60 * 60 * 1_000_000), + } + factor = precision_factors.get(precision, 1) + lower_bound = floor(age / factor) * factor + upper_bound = (floor(age / factor) + 1) * factor + return lower_bound, upper_bound diff --git a/metis/metric/timeliness/timeliness_heinrich_config.py b/metis/metric/timeliness/timeliness_heinrich_config.py new file mode 100644 index 0000000..ee8a45b --- /dev/null +++ b/metis/metric/timeliness/timeliness_heinrich_config.py @@ -0,0 +1,45 @@ +import dataclasses +from dataclasses import dataclass +from typing import Dict + +from metis.metric.config import MetricConfig +from metis.utils.datetime.datetime_precision import DTPrecision + + +@dataclass +class timeliness_heinrich_column_config: + """ + Configuration class for a single column in the timeliness_heinrich metric (used as part of timeliness_heinrich_config). + + :param decline_rate: Decline rate for the column + :param ingestion_date_column: Name of the column containing the ingestion date that should be used to calculate the age of the data for this column + :param to_datetime_kwargs: Optional keyword arguments for pandas.to_datetime when parsing the date in ingestion_date_column. + :param simulated_assessment_date: Optional simulated assessment date in string format. If not provided, the current date will be used. This can be used to simulate the assessment of data at a specific point in time, which is especially useful for testing and evaluation purposes. + :param simulated_timestamp_precision: Optional simulated precision of each the timestamps in ingestion_date_column. If not provided, the precision is detected automatically. The precision is used to assess the certainty of the timeliness measurements. + """ + + decline_rate: float + ingestion_date_column: str + to_datetime_kwargs: Dict | None = None + simulated_assessment_date: str | None = None + simulated_timestamp_precision: DTPrecision | None = None + + +@dataclass +class timeliness_heinrich_config(MetricConfig): + """ + Configuration class for the timeliness_heinrich metric. + + :param timeliness_config_per_column: Configuration for each column in the timeliness_heinrich metric. Each column can have a different decline rate and ingestion date column, which allows for a more fine-grained and accurate assessment of timeliness based on the specific characteristics of each column. + """ + + timeliness_config_per_column: Dict[str, timeliness_heinrich_column_config] + + def to_json(self): + return { + "name": self.__class__.__name__, + "timeliness_config_per_column": { + col: dataclasses.asdict(config) + for col, config in self.timeliness_config_per_column.items() + }, + } diff --git a/metis/metric/validity/validity_outOfVocabulary.py b/metis/metric/validity/validity_outOfVocabulary.py index 8c5289f..2075124 100644 --- a/metis/metric/validity/validity_outOfVocabulary.py +++ b/metis/metric/validity/validity_outOfVocabulary.py @@ -1,15 +1,23 @@ import re -import pandas as pd from typing import List, Union -from metis.utils.result import DQResult -from metis.metric.metric import Metric import nltk +import pandas as pd from nltk.corpus import words as nltk_words -nltk.download("words", quiet=True) + +from metis.metric.config import MetricConfig +from metis.metric.metric import Metric +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity +from metis.utils.result import DQResult + class validity_outOfVocabulary(Metric): - def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, set, None] = None, metric_config: Union[str, None] = None) -> List[DQResult]: + def __init__(self) -> None: + super().__init__() + nltk.download("words", quiet=True) + + def assess(self, data: pd.DataFrame, reference: Union[pd.DataFrame, set, None] = None, metric_config: Union[MetricConfig, str, None] = None) -> List[DQResult]: """ General vocabulary check at token level. Any alphabetic token not in the standard vocab is OOV. @@ -63,9 +71,9 @@ def is_valid(text: str) -> bool: result = DQResult( timestamp=pd.Timestamp.now(), - DQdimension="Validity", - DQmetric="OutOfVocabulary", - DQgranularity="column", + DQdimension=DQDimension.VALIDITY, + DQmetric=self.__class__.__name__, + DQgranularity=DQGranularity.COLUMN, DQvalue=dq_value, DQexplanation=annotations, columnNames=[column], diff --git a/metis/profiling/data_profile_manager.py b/metis/profiling/data_profile_manager.py index 946bc4a..3699c0b 100644 --- a/metis/profiling/data_profile_manager.py +++ b/metis/profiling/data_profile_manager.py @@ -4,10 +4,10 @@ import threading from typing import Any, Dict, List, Optional -from sqlalchemy import Engine, create_engine as sa_create_engine, delete, select +from sqlalchemy import delete, select from sqlalchemy.orm import Session -from metis.database_models import Base, DataProfile +from metis.database import Database class DataProfileManager: @@ -31,7 +31,7 @@ class DataProfileManager: @classmethod def initialize( cls, - engine_or_url: Engine | str, + database: Database, ignore_cache: bool = False, overwrite_cache: bool = False, clear_cache: bool = False, @@ -44,21 +44,20 @@ def initialize( clear_cache: Delete all stored profiles at startup, then cache normally. """ with cls._lock: - if isinstance(engine_or_url, str): - engine = sa_create_engine(engine_or_url) - else: - engine = engine_or_url - Base.metadata.create_all(engine) if clear_cache: - with Session(engine) as session: - session.execute(delete(DataProfile)) + with Session(database.engine) as session: + session.execute(delete(database.DataProfile)) session.commit() - cls._instance = cls(engine, ignore_cache=ignore_cache, overwrite_cache=overwrite_cache) + cls._instance = cls( + database=database, + ignore_cache=ignore_cache, + overwrite_cache=overwrite_cache, + ) return cls._instance @classmethod def get_instance(cls) -> DataProfileManager: - """Return the current singleton. Raises if not initialized.""" + """Return the current singleton. Raises if not initialized.""" if cls._instance is None: raise RuntimeError( "DataProfileManager has not been initialized. " @@ -75,16 +74,16 @@ def shutdown(cls) -> None: """Shutdown the singleton and dispose the engine.""" with cls._lock: if cls._instance is not None: - cls._instance._engine.dispose() + cls._instance._database.engine.dispose() cls._instance = None def __init__( self, - engine: Engine, + database: Database, ignore_cache: bool = False, overwrite_cache: bool = False, ) -> None: - self._engine = engine + self._database = database self._dataset: Optional[str] = None self._table: Optional[str] = None self._mem_cache: Dict[str, Any] = {} @@ -141,12 +140,12 @@ def lookup( return self._mem_cache[key] # slow path: DB - with Session(self._engine) as session: + with Session(self._database.engine) as session: stmt = ( - select(DataProfile) - .where(DataProfile.dataset == self._dataset) - .where(DataProfile.table_name == self._table) - .where(DataProfile.dp_task_name == dp_task_name) + select(self._database.DataProfile) + .where(self._database.DataProfile.dataset == self._dataset) + .where(self._database.DataProfile.table_name == self._table) + .where(self._database.DataProfile.dp_task_name == dp_task_name) ) for row in session.execute(stmt).scalars(): if sorted(row.column_names) == sorted(column_names): @@ -181,13 +180,13 @@ def store( serialized, result_type = self._serialize(value) - with Session(self._engine) as session: + with Session(self._database.engine) as session: # Find existing row with same logical key stmt = ( - select(DataProfile) - .where(DataProfile.dataset == ds) - .where(DataProfile.table_name == tbl) - .where(DataProfile.dp_task_name == dp_task_name) + select(self._database.DataProfile) + .where(self._database.DataProfile.dataset == ds) + .where(self._database.DataProfile.table_name == tbl) + .where(self._database.DataProfile.dp_task_name == dp_task_name) ) existing = None for row in session.execute(stmt).scalars(): @@ -203,17 +202,19 @@ def store( existing.profile_type = profile_type existing.source = source else: - session.add(DataProfile( - dataset=ds, - table_name=tbl, - column_names=column_names, - dp_task_name=dp_task_name, - task_config=task_config, - profile_type=profile_type, - dp_result_value=serialized, - result_type=result_type, - source=source, - )) + session.add( + self._database.DataProfile( + dataset=ds, + table_name=tbl, + column_names=column_names, + dp_task_name=dp_task_name, + task_config=task_config, + profile_type=profile_type, + dp_result_value=serialized, + result_type=result_type, + source=source, + ) + ) session.commit() # update in-memory cache @@ -316,12 +317,12 @@ def _query_by_task( tbl = table or self._table if ds is None or tbl is None: return [] - with Session(self._engine) as session: + with Session(self._database.engine) as session: stmt = ( - select(DataProfile) - .where(DataProfile.dataset == ds) - .where(DataProfile.table_name == tbl) - .where(DataProfile.dp_task_name == dp_task_name) + select(self._database.DataProfile) + .where(self._database.DataProfile.dataset == ds) + .where(self._database.DataProfile.table_name == tbl) + .where(self._database.DataProfile.dp_task_name == dp_task_name) ) return [ self._deserialize(row.dp_result_value, row.result_type) @@ -363,7 +364,11 @@ def to_json_safe(v: Any) -> Any: } }, "minhash" - if isinstance(value, dict) and value and isinstance(next(iter(value.values())), _MinHash): + if ( + isinstance(value, dict) + and value + and isinstance(next(iter(value.values())), _MinHash) + ): return { "v": { k: { @@ -392,16 +397,18 @@ def _deserialize(payload: Optional[dict], result_type: str) -> Any: if result_type == "series": return pd.Series(raw) if result_type == "minhash": - from datasketch import MinHash as _MinHash import numpy as np + from datasketch import MinHash as _MinHash + return _MinHash( num_perm=raw["num_perm"], seed=raw["seed"], hashvalues=np.array(raw["hashvalues"], dtype=np.uint64), ) if result_type == "minhash_dict": - from datasketch import MinHash as _MinHash import numpy as np + from datasketch import MinHash as _MinHash + return { k: _MinHash( num_perm=v["num_perm"], diff --git a/metis/utils/__init__.py b/metis/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metis/utils/datetime/datetime_precision.py b/metis/utils/datetime/datetime_precision.py new file mode 100644 index 0000000..e6556d6 --- /dev/null +++ b/metis/utils/datetime/datetime_precision.py @@ -0,0 +1,34 @@ +import datetime +from typing import Literal + +from dateutil import parser + +DTPrecision = Literal["year", "month", "day", "hour", "minute", "second", "microsecond"] + + +class datetimespy(datetime.datetime): + def replace(self, *args, **kwargs): + self._replaced_args = args + self._replaced_kwargs = kwargs + return super().replace(*args, **kwargs) + + +def determine_datetime_precision(dt_str: str) -> DTPrecision: + default = datetimespy.now() + parser.parse(dt_str, default=default) + + replaced_fields = getattr(default, "_replaced_kwargs", {}) + + if "microsecond" in replaced_fields: + return "microsecond" + if "second" in replaced_fields: + return "second" + if "minute" in replaced_fields: + return "minute" + if "hour" in replaced_fields: + return "hour" + if "day" in replaced_fields: + return "day" + if "month" in replaced_fields: + return "month" + return "year" diff --git a/metis/utils/disguised_missing_values/__init__.py b/metis/utils/disguised_missing_values/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metis/utils/disguised_missing_values/fahes/__init__.py b/metis/utils/disguised_missing_values/fahes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metis/utils/disguised_missing_values/fahes/fahes.py b/metis/utils/disguised_missing_values/fahes/fahes.py new file mode 100644 index 0000000..67b9381 --- /dev/null +++ b/metis/utils/disguised_missing_values/fahes/fahes.py @@ -0,0 +1,85 @@ +import ctypes +import os +import tempfile +from pathlib import Path +from statistics import mean + +import pandas as pd + +"""FAHES paper: https://raulcastrofernandez.com/papers/kdd18-fahes.pdf, Code: https://github.com/qcri/FAHES_Code.git, and Demo: https://github.com/qcri/Fahes_Demo.git""" + +FAHES_PRECISION = mean([0.384, 0.484, 0.385, 0.371, 0.522]) +FAHES_RECALL = mean([0.952, 0.978, 0.87, 0.929, 0.725]) +FAHES_F1 = 2 * FAHES_PRECISION * FAHES_RECALL / (FAHES_PRECISION + FAHES_RECALL) + + +def call_fahes(tab_full_name, output_dir): + path = Path(__file__).parent.resolve() / "lib" / "FAHES_Code" / "libFahes.so" + if not path.exists(): + raise FileNotFoundError( + f"Fahes shared library not found at: {path}. Please clone https://github.com/qcri/FAHES_Code.git into {path.parent} and compile it using the provided makefile at {path.parent.parent / 'makefile'}." + ) + + LP_c_char = ctypes.POINTER(ctypes.c_char) + LP_LP_c_char = ctypes.POINTER(LP_c_char) + try: + Fahes = ctypes.CDLL(str(path), use_errno=True) + except OSError as e: + raise ImportError(f"Failed to load Fahes shared library: {path}") from e + + try: + Fahes.main.argtypes = (ctypes.c_int, LP_LP_c_char) + except AttributeError as e: + raise AttributeError( + "Fahes library missing 'main' or has unexpected signature" + ) from e + + ctypes.set_errno(0) + args = [str(path), tab_full_name, output_dir, "4"] + argc = len(args) + argv = (LP_c_char * (argc + 1))() + for i, arg in enumerate(args): + enc_arg = arg.encode("utf-8") + argv[i] = ctypes.create_string_buffer(enc_arg) + + rc = Fahes.main(argc, argv) + if rc != 0: + err = ctypes.get_errno() + err_msg = os.strerror(err) if err else "Unknown C error" + raise RuntimeError(f"Fahes.main failed (rc={rc}, errno={err}: {err_msg})") + + +# Based on https://github.com/qcri/Fahes_Demo.git +def run_fahes(data: Path | str | pd.DataFrame) -> pd.DataFrame | None: + """ + Run FAHES on the given data file and return the resulting DataFrame. The resulting DataFrame contains the disguised missing values identified by FAHES. + Example resulting DataFrame structure: + + | Table Name | Attribute Name | DMV | Frequency | Detecting Tool | + |------------|----------------|-------------------|-----------|----------------| + | adult.csv | workclass | ? | 183 | Rand | + + :param data: Path to the input CSV data file or DataFrame containing the data. Warning: if a DataFrame is provided, it will be saved to a temporary CSV file before processing. + :return: DataFrame with disguised missing values identified by FAHES. + """ + tmp_file = None + + try: + if isinstance(data, pd.DataFrame): + tmp_file = tempfile.NamedTemporaryFile(suffix=".csv") + data.to_csv(tmp_file.name, index=False) + data_file_path = Path(tmp_file.name) + else: + data_file_path = Path(data) + + if not data_file_path.exists(): + raise FileNotFoundError(f"Data file not found: {data_file_path}") + + with tempfile.TemporaryDirectory() as results_dir: + call_fahes(str(data_file_path.absolute()), results_dir) + result_file = Path(results_dir) / ("DMV_" + data_file_path.name) + if result_file.stat().st_size > 0: + return pd.read_csv(result_file) + finally: + if tmp_file is not None: + tmp_file.close() diff --git a/metis/utils/disguised_missing_values/fahes/lib/makefile b/metis/utils/disguised_missing_values/fahes/lib/makefile new file mode 100755 index 0000000..aafd392 --- /dev/null +++ b/metis/utils/disguised_missing_values/fahes/lib/makefile @@ -0,0 +1,14 @@ +LDFLAGS = -shared +TARGET_LIB = ../libFahes.so +CHILD_MAKEFILE_DIR = FAHES_Code/src +ARGS = -C $(CHILD_MAKEFILE_DIR) TARGET=$(TARGET_LIB) LFLAGS+=$(LDFLAGS) + +.PHONY: all +all: + $(MAKE) $(ARGS) $@ + +.PHONY: clean +clean: + $(MAKE) $(ARGS) clean $@ +rmo: + $(MAKE) $(ARGS) rmo $@ diff --git a/metis/utils/dq_dimension.py b/metis/utils/dq_dimension.py new file mode 100644 index 0000000..05e3985 --- /dev/null +++ b/metis/utils/dq_dimension.py @@ -0,0 +1,12 @@ +from enum import StrEnum + + +class DQDimension(StrEnum): + """Data Quality Dimensions Enum. Primarily used for labeling DQResults inside each metric implementation with the DQ Dimension they assess.""" + + CONSISTENCY = "Consistency" + CORRECTNESS = "Correctness" + COMPLETENESS = "Completeness" + TIMELINESS = "Timeliness" + MINIMALITY = "Minimality" + VALIDITY = "Validity" diff --git a/metis/utils/dq_granularity.py b/metis/utils/dq_granularity.py new file mode 100644 index 0000000..b8c87c2 --- /dev/null +++ b/metis/utils/dq_granularity.py @@ -0,0 +1,10 @@ +from enum import StrEnum + + +class DQGranularity(StrEnum): + """Data Quality Granularity Enum. Primarily used for labeling DQResults inside each metric implementation with the DQ Granularity they assess.""" + + CELL = "cell" + ROW = "row" + COLUMN = "column" + TABLE = "table" diff --git a/metis/utils/logging.py b/metis/utils/logging.py new file mode 100644 index 0000000..7a3ab7a --- /dev/null +++ b/metis/utils/logging.py @@ -0,0 +1,23 @@ +import logging + +logger = logging.getLogger("metis") +logging.basicConfig(level=logging.INFO) + + +def warn_unconfigured_columns( + logger: logging.Logger, + data_columns: set[str] | list[str], + configured_columns: set[str] | list[str], + config_type: str, +): + extraneous_rules = set(configured_columns) - set(data_columns) + if extraneous_rules: + logger.warning( + f"The following columns have {config_type} defined but are not present in the data: {extraneous_rules}. These {config_type} will be ignored." + ) + + extraneous_columns = set(data_columns) - set(configured_columns) + if extraneous_columns: + logger.warning( + f"The following columns are present in the data but have no {config_type} defined: {extraneous_columns}. These columns will be skipped." + ) diff --git a/metis/utils/numbers.py b/metis/utils/numbers.py new file mode 100644 index 0000000..50a1e6e --- /dev/null +++ b/metis/utils/numbers.py @@ -0,0 +1,12 @@ +def clamp( + value: int | float, min_value: int | float, max_value: int | float +) -> int | float: + return max(min(value, max_value), min_value) + + +def format_count(value: int | float) -> str: + """Formats a large number with appropriate suffixes (K, M, B, T) for thousands, millions, billions, and trillions.""" + suffixes = ["", "K", "M", "B", "T"] + string_value = str(int(value)) + suffix = min((len(string_value) - 1) // 3, len(suffixes) - 1) + return f"{string_value[: -3 * suffix] if suffix > 0 else string_value}{suffixes[suffix]}" diff --git a/metis/utils/result.py b/metis/utils/result.py index 5b9e18f..bba02a6 100644 --- a/metis/utils/result.py +++ b/metis/utils/result.py @@ -1,13 +1,16 @@ from typing import List, Union import pandas as pd +from metis.utils.dq_dimension import DQDimension +from metis.utils.dq_granularity import DQGranularity + class DQResult: def __init__( self, timestamp: pd.Timestamp, - DQdimension: str, + DQdimension: DQDimension, DQmetric: str, - DQgranularity: str, + DQgranularity: DQGranularity, DQvalue: float, DQexplanation: Union[dict, None] = None, runtime: Union[float, None] = None, @@ -22,9 +25,9 @@ def __init__( Required arguments - `timestamp: pd.Timestamp`: The time at which the result was assessed. - - `DQdimension: str`: Data quality dimension assessed (e.g. 'completeness', 'accuracy'). + - `DQdimension: DQDimension`: Data quality dimension assessed (e.g. DQDimension.COMPLETENESS, DQDimension.ACCURACY). - `DQmetric: str`: Name of the specific metric within the dimension. - - `DQgranularity: str`: Granularity of the metric (e.g. 'column', 'table', 'cell'). + - `DQgranularity: DQGranularity`: Granularity of the metric (e.g. DQGranularity.COLUMN, DQGranularity.TABLE, DQGranularity.CELL, DQGranularity.ROW). - `DQvalue: float`: Numeric outcome of the assessment (quantitative only). Optional arguments @@ -161,7 +164,7 @@ def dataset(self): @dataset.setter def dataset(self, value): self._dataset = value - + @property def configJson(self): return self._configJson @@ -185,4 +188,4 @@ def as_json(self): "experimentTag": self._experimentTag, "dataset": self._dataset, "configJson": self._configJson, - } \ No newline at end of file + } diff --git a/metis/utils/similarity_measures/levenshtein_distance.py b/metis/utils/similarity_measures/levenshtein_distance.py new file mode 100644 index 0000000..ef2a011 --- /dev/null +++ b/metis/utils/similarity_measures/levenshtein_distance.py @@ -0,0 +1,18 @@ +# https://stackoverflow.com/a/32558749 +def levenshtein_distance(s1: str, s2: str) -> int: + """Calculate the Levenshtein distance between s1 and s2.""" + if len(s1) > len(s2): + s1, s2 = s2, s1 + + distances = range(len(s1) + 1) + for i2, c2 in enumerate(s2): + distances_ = [i2 + 1] + for i1, c1 in enumerate(s1): + if c1 == c2: + distances_.append(distances[i1]) + else: + distances_.append( + 1 + min(distances[i1], distances[i1 + 1], distances_[-1]) + ) + distances = distances_ + return distances[-1] diff --git a/metis/writer/console_writer.py b/metis/writer/console_writer.py index c5eb6f2..4cbc88e 100644 --- a/metis/writer/console_writer.py +++ b/metis/writer/console_writer.py @@ -3,9 +3,9 @@ from metis.utils.result import DQResult class ConsoleWriter: - def __init__(self, writer_config: Dict = None) -> None: + def __init__(self, writer_config: Dict | None = None) -> None: pass def write(self, results: List[DQResult]) -> None: for result in results: - print(result.as_json()) \ No newline at end of file + print(result.as_json()) diff --git a/metis/writer/csv_writer.py b/metis/writer/csv_writer.py new file mode 100644 index 0000000..cceb7b9 --- /dev/null +++ b/metis/writer/csv_writer.py @@ -0,0 +1,33 @@ +from pathlib import Path +from typing import Dict, List + +import pandas as pd + +from metis.utils.logging import logger +from metis.utils.result import DQResult + + +class CSVWriter: + def __init__(self, writer_config: Dict) -> None: + if "path" not in writer_config: + raise ValueError( + f"{self.__class__.__name__} requires a 'path' in the configuration." + ) + + self.path = Path(writer_config["path"]) + if not self.path.suffix == ".csv": + raise ValueError( + f"{self.__class__.__name__} path must end with .csv extension." + ) + + if self.path.exists(): + logger.warning( + f"{self.__class__.__name__} path {self.path} already exists and will be overwritten." + ) + + def write(self, results: List[DQResult]) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame([result.as_json() for result in results]).to_csv( + self.path, index=False + ) + logger.info(f"Results saved to {self.path.absolute()}") diff --git a/metis/writer/database_writer.py b/metis/writer/database_writer.py index ff1d72f..026345f 100644 --- a/metis/writer/database_writer.py +++ b/metis/writer/database_writer.py @@ -1,22 +1,18 @@ -from typing import Dict, List +from typing import List -from sqlalchemy import Engine from sqlalchemy.orm import Session +from tqdm import tqdm -from metis.database_models import Base, register_models +from metis.database import Database +from metis.utils.numbers import format_count from metis.utils.result import DQResult from metis.writer.writer import DQResultWriter class DatabaseWriter(DQResultWriter): - def __init__(self, writer_config: Dict) -> None: - self.engine = self.create_engine(writer_config) - - self.DQResultModel = register_models(writer_config.get("table_name", "dq_results")) - Base.metadata.create_all(self.engine) - - def create_engine(self, writer_config: Dict) -> Engine: - raise NotImplementedError("Subclasses must implement the create_engine method.") + def __init__(self, db: Database) -> None: + self.engine = db.engine + self.DQResultModel = db.DQResultModel def write(self, results: List[DQResult]) -> None: with Session(self.engine) as session: @@ -38,5 +34,11 @@ def write(self, results: List[DQResult]) -> None: ) for result in results ] - session.add_all(db_entities) + for batch in tqdm( + range(0, len(db_entities), 1000), + desc=f"Writing {format_count(len(db_entities))} DQ results to database", + unit="k results", + ): + session.add_all(db_entities[batch : batch + 1000]) + session.flush() session.commit() diff --git a/metis/writer/postgres_writer.py b/metis/writer/postgres_writer.py deleted file mode 100644 index 3f0b65b..0000000 --- a/metis/writer/postgres_writer.py +++ /dev/null @@ -1,17 +0,0 @@ -from sqlalchemy import Engine, create_engine - -from metis.writer.database_writer import DatabaseWriter - - -class PostgresWriter(DatabaseWriter): - def create_engine(self, writer_config) -> Engine: - required_keys = ("db_user", "db_pass", "db_name", "db_host", "db_port") - if not all(k in writer_config for k in required_keys): - raise ValueError( - "Postgres writer config must include 'db_user', 'db_pass', 'db_name', 'db_host', and 'db_port' fields." - ) - - return create_engine( - f"postgresql://{writer_config['db_user']}:{writer_config['db_pass']}@{writer_config['db_host']}:{writer_config['db_port']}/{writer_config['db_name']}", - echo=writer_config.get("echo", False), - ) diff --git a/metis/writer/sqlite_writer.py b/metis/writer/sqlite_writer.py deleted file mode 100644 index aa8da27..0000000 --- a/metis/writer/sqlite_writer.py +++ /dev/null @@ -1,14 +0,0 @@ -from sqlalchemy import Engine, create_engine - -from metis.writer.database_writer import DatabaseWriter - - -class SQLiteWriter(DatabaseWriter): - def create_engine(self, writer_config) -> Engine: - if "db_name" not in writer_config: - raise ValueError("SQLite writer config must include 'db_name' field.") - - return create_engine( - f"sqlite:///{writer_config['db_name']}", - echo=writer_config.get("echo", False), - ) diff --git a/requirements.txt b/requirements.txt index b092ffc..55db55d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ pandas psycopg2-binary sqlite3 ; sys_platform == "win32" # sqlite3 is included with Python, but this line is for completeness sqlalchemy==2.0.44 -nltk +nltk==3.9.2 +tqdm numpy datasketch