Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4630a57
add correctness metric
jb3rndt Oct 8, 2025
7c131bf
add rule-based consistency metric
jb3rndt Oct 8, 2025
c540fdc
add currency metric
jb3rndt Oct 11, 2025
89da95b
add MetricConfig support in addition to json configuration
jb3rndt Oct 12, 2025
379641f
prototypical assessment of certainty of the rule-consistency metric
jb3rndt Oct 19, 2025
c823e93
fix correctness metric and adjust docstring
jb3rndt Oct 19, 2025
8947834
add prototypical certainty calculation to correctness and currency
jb3rndt Dec 1, 2025
cf3305c
add postgres docker setup
jb3rndt Dec 1, 2025
33ad9bb
add basic logging
jb3rndt Dec 7, 2025
a77a890
move common utilities into utils folder
jb3rndt Dec 7, 2025
bf8e8c9
add fallback writing to csv in case the writer errors
jb3rndt Dec 8, 2025
cd9a0a7
rename metrics according to naming scheme
jb3rndt Dec 8, 2025
164ae7f
add csv writer
jb3rndt Dec 8, 2025
1c3d0a5
rename metric files
jb3rndt Dec 12, 2025
e1362ea
add tuple rules to metric ConsistencyRuleBasedHinrichs
jb3rndt Dec 15, 2025
3f22d9d
remove certainty from correctness and timeliness for now again
jb3rndt Dec 15, 2025
5bd62cb
rename metrics and configs and add some documentation
jb3rndt Dec 17, 2025
9e0c2ff
add consistency_ruleBasedPipino metric
jb3rndt Dec 22, 2025
4131dad
Add completeness metrics based on null values and dmvs detected by FAHES
jb3rndt Jan 20, 2026
557e400
Add certainty to timeliness metric
jb3rndt Jan 20, 2026
77f18cd
Add consistency rules certainty
jb3rndt Jan 20, 2026
aa34891
Update consistency certainty calculation
jb3rndt Jan 22, 2026
40701f9
Add simulated precision to timeliness metric
jb3rndt Feb 2, 2026
e6dff29
Merge remote-tracking branch 'origin/main' into feat/correctness-metric
jb3rndt Feb 2, 2026
ad591fc
Update DQResult usages to newer interface
jb3rndt Feb 2, 2026
742f8fa
Rename and merge completeness metrics
jb3rndt Feb 10, 2026
7205bd1
Store values as floats instead of numpy floats
jb3rndt Feb 22, 2026
68e9684
Allow completeness assessment on cell granularity
jb3rndt Mar 1, 2026
2152441
Merge remote-tracking branch 'origin/main' into feat/correctness-metric
jb3rndt Mar 6, 2026
4030b20
Create central Database class to use in different modules
jb3rndt Mar 6, 2026
6da26eb
Add enum for data quality assessment granularity
jb3rndt Mar 6, 2026
15cbb0b
Remove unused config parameter from demo
jb3rndt Mar 6, 2026
ebb932b
Allow column-specific timeliness configuration
jb3rndt Mar 9, 2026
d3798d8
Fix result creation index handling
jb3rndt Mar 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ python -m demo.getting_started

## How to implement new metrics

To extend the Metis framework and add new data quality metrics, please check our interface for easy integration.
To extend the Metis framework and add new data quality metrics, please check our interface for easy integration.
````python
def assess(self,
data: pd.DataFrame,
reference: Union[pd.DataFrame, None] = None,
def assess(self,
data: pd.DataFrame,
reference: Union[pd.DataFrame, None] = None,
metric_config: Union[str, None] = None) -> List[DQResult]:
````
Each metric should be a subclass of ```metis.metric.metric.Metric``` and implement the assess method. This method takes three arguments:
Expand All @@ -29,14 +29,17 @@ The metric should return a list of ```metis.utils.result.DQResult```. This can b

### Metric naming convention

Metrics are organized by dimension (e.g., `completeness`, `minimality`), where one folder exists for each.
Metrics are organized by dimension (e.g., `completeness`, `minimality`), where one folder exists for each.
New metrics should follow the naming format: `{DimensionName}_{Technique}`

- **DimensionName**: The quality dimension being measured (e.g., `Completeness`, `Minimality`)
- **Technique**: The calculation or method used (e.g., `NullRatio`, `DuplicateCount`)
- **Granularity**: The level of analysis (e.g., `cell`, `row`, `column`, `table`) should be passed as a parameter through the metric config file if the metric can be applied at different granularity levels.

Examples: `completeness_NullRatio`, `minimality_DuplicateCount`
Examples: `completeness_nullRatio`, `minimality_duplicateCount`

The file name and class name of each metric should be equal. If a metric has a specific config class, the name of the config class should be `{MetricName}_config` (e.g., `completeness_missingRatio_config`).

- **Granularity**: The level of analysis (e.g., `cell`, `row`, `column`, `table`) should be passed as a parameter through the metric config file if the metric can be applied at different granularity levels.

## Output: creating a DQResult

Expand All @@ -45,27 +48,35 @@ class DQResult:
def __init__(
self,
timestamp: pd.Timestamp,
DQvalue: float,
DQdimension: str,
DQdimension: DQDimension,
DQmetric: str,
DQgranularity: str,
DQvalue: float,
DQexplanation: Union[dict, None] = None,
runtime: Union[float, None] = None,
tableName: Union[str, None] = None,
columnNames: Union[List[str], None] = None,
rowIndex: Union[int, None] = None,
DQannotations: Union[dict, None] = None,
experimentTag: Union[str, None] = None,
dataset: Union[str, None] = None,
tableName: Union[str, None] = None,
configJson: Union[dict, None] = None,
):
````

To create a new instance of DQResult, one needs to provide at least the following arguments:
- **timestamp: pd.Timestamp**: The time at which a result was assessed.
- **DQvalue: float**: The result of the assessment. This currently only supports quantitative assessments.
- **DQdimension: str**: The name of the data quality dimension that was assessed e.g. completeness, accuracy, etc.
- **DQmetric: str**: The name of the specific metric inside the given dimension that was assessed.

Furthermore, there are more optional arguments that might need to be set depending on the nature of different metrics. ```dataset``` and ```tableName``` are automatically set by the ```metis.dq_orchestrator.DQOrchestrator``` class which controles the data quality assessment and takes care of calling the individual metrics and storing the results.
- **columnNames: Optional[List[str]]**: List of column names associated with the assessed result. For example for column level completeness, this would be a list with a single column name, for table level completeness this would be empty since the result is valid for the whole table.
- **rowIndex: Optional[int]**: Index of the row this result is associated with. This can either be used together with columnNames to assess data quality on a cell level or for row based metrics.
- **DQannotations: Optional[dict]**: To allow metrics to save additional information or annotations, this dictionary can store all additional information that might need to be saved. This currently does not need for follow a predefined structure.
- **DQdimension: DQDimension**: Data quality dimension assessed (e.g. `DQDimension.COMPLETENESS`, `DQDimension.ACCURACY`).
- **DQmetric: str**: Name of the specific metric within the dimension.
- **DQgranularity: str**: Granularity of the metric (e.g. 'column', 'table', 'cell', 'row').
- **DQvalue: float**: Numeric outcome of the assessment. This currently only supports quantitative assessments.

Furthermore, there are more optional arguments that might need to be set depending on the nature of different metrics. ```dataset``` and ```tableName``` are automatically set by the ```metis.dq_orchestrator.DQOrchestrator``` class which controls the data quality assessment and takes care of calling the individual metrics and storing the results.
- **DQexplanation: Optional[dict]**: Arbitrary additional information produced by the metric (no fixed schema required).
- **runtime: Optional[float]**: Time taken to compute the metric, in seconds.
- **columnNames: Optional[List[str]]**: Columns that this result pertains to. For a column-level metric this is typically a single-item list; for a table-level metric this may be `None` or an empty list.
- **rowIndex: Optional[int]**: Row index associated with the result. Use together with `columnNames` to represent a cell-level result, or for row-based metrics.
- **experimentTag: Optional[str]**: Tag to identify a specific run.
- **configJson: Optional[dict]**: Configuration used for the metric as a JSON object.

## Data Profiling

Expand Down
7 changes: 5 additions & 2 deletions demo/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

orchestrator.load(data_loader_configs=["data/adult.json"])

orchestrator.assess(metrics=["completeness_nullRatio"], metric_configs=['{"measure_runtime": true}'])
orchestrator.assess(metrics=["completeness_nullRatio"], metric_configs=[""])
orchestrator.assess(metrics=["minimality_duplicateCount"], metric_configs=[None])
orchestrator.assess(metrics=["validity_outOfVocabulary"], metric_configs=['{"use_nltk": true, "lowercase": true}'])
orchestrator.assess(
metrics=["validity_outOfVocabulary"],
metric_configs=['{"use_nltk": true, "lowercase": true}'],
)
14 changes: 14 additions & 0 deletions docker_compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
db:
image: postgres:18
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: metis_db
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql

volumes:
pgdata:
Empty file added metis/__init__.py
Empty file.
71 changes: 71 additions & 0 deletions metis/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Dict, Literal

from sqlalchemy import create_engine

from metis.database_models import register_models


class Database:
"""Provides a singleton reference for the database connection and models. Can be used by different modules to access the database without risking conflicts caused by multiple bases or engines."""

_instance: Database | None = None

def __init__(self, db_type: Literal["sqlite", "postgres"], db_config: Dict):
if Database._instance is not None:
raise RuntimeError(
"Database has already been initialized. Use Database.get_instance() to access the singleton."
)

self.engine = self.create_engine(db_type, db_config)

Base, self.DQResultModel, self.DataProfile = register_models(
db_config.get("table_name", "dq_results")
)
Base.metadata.create_all(self.engine)

Database._instance = self

@classmethod
def get_instance(cls) -> Database:
"""Return the current singleton. Raises if not initialized."""
if cls._instance is None:
raise RuntimeError(
"Database has not been initialized. "
"Call Database.initialize(engine) first."
)
return cls._instance

@classmethod
def is_initialized(cls) -> bool:
return cls._instance is not None

def create_engine(self, db_type: Literal["sqlite", "postgres"], db_config: Dict):
if db_type == "sqlite":
return self.create_sqlite_engine(db_config)
elif db_type == "postgres":
return self.create_postgres_engine(db_config)
raise ValueError(f"Unsupported database type: {db_type}")

def create_sqlite_engine(self, db_config: Dict):
required_keys = ("db_name",)
if not all(k in db_config for k in required_keys):
raise ValueError(
f"SQLite database config must include the following fields: {required_keys}."
)

return create_engine(
f"sqlite:///{db_config['db_name']}",
echo=db_config.get("echo", False),
)

def create_postgres_engine(self, db_config: Dict):
required_keys = ("db_user", "db_pass", "db_name", "db_host", "db_port")
if not all(k in db_config for k in required_keys):
raise ValueError(
f"Postgres database config must include the following fields: {required_keys}."
)

return create_engine(
f"postgresql://{db_config['db_user']}:{db_config['db_pass']}@{db_config['db_host']}:{db_config['db_port']}/{db_config['db_name']}",
echo=db_config.get("echo", False),
)
74 changes: 40 additions & 34 deletions metis/database_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
pass

def register_models(results_table_name: str):
"""Register the SQLAlchemy models for the database tables based on initial configuration. Every call creates a new SQLAlchemy base, which is not bound to any engine yet. Use the Database singleton for stable references to the models and engine."""

class Base(DeclarativeBase):
pass

class DQResultModel(Base):
__tablename__ = results_table_name
__table_args__ = {"extend_existing": True}

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
timestamp: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
dq_dimension: Mapped[str]
dq_metric: Mapped[str]
dq_granularity: Mapped[str]
Expand All @@ -28,41 +32,43 @@ class DQResultModel(Base):
dataset: Mapped[str | None]
config_json: Mapped[dict | None] = mapped_column(JSON)

return DQResultModel
class DataProfile(Base):
"""Stores data profiling results for caching and manual imports.

class DataProfile(Base):
"""Stores data profiling results for caching and manual imports.
Covers single-column statistics (null_count, distinct_count, histograms, ...),
multi-column dependencies (FDs, UCCs, INDs, ...), and any other profiling
result type. The result payload is stored as JSON so the schema stays
flexible across different task types.
"""

Covers single-column statistics (null_count, distinct_count, histograms, ...),
multi-column dependencies (FDs, UCCs, INDs, ...), and any other profiling
result type. The result payload is stored as JSON so the schema stays
flexible across different task types.
"""
__tablename__ = "data_profiles"
__table_args__ = {"extend_existing": True}

__tablename__ = "data_profiles"
__table_args__ = {"extend_existing": True}
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
timestamp: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
timestamp: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
# --- identifiers ---
dataset: Mapped[str]
table_name: Mapped[str]
column_names: Mapped[List[str]] = mapped_column(JSON)
dp_task_name: Mapped[str] # e.g. "null_count", "fd", "ucc"
task_config: Mapped[dict | None] = mapped_column(JSON) # extra params used

# --- identifiers ---
dataset: Mapped[str]
table_name: Mapped[str]
column_names: Mapped[List[str]] = mapped_column(JSON)
dp_task_name: Mapped[str] # e.g. "null_count", "fd", "ucc"
task_config: Mapped[dict | None] = mapped_column(JSON) # extra params used
# --- category ---
profile_type: Mapped[str] = mapped_column(default="single_column")
# "single_column" | "multi_column" | "dependency" | "custom"

# --- category ---
profile_type: Mapped[str] = mapped_column(default="single_column")
# "single_column" | "multi_column" | "dependency" | "custom"
# --- result ---
dp_result_value: Mapped[dict | None] = mapped_column(
JSON
) # {"v": <actual_value>}
result_type: Mapped[str] = mapped_column(default="scalar")
# "scalar" | "list" | "dict" | "series" — for deserialization hint

# --- result ---
dp_result_value: Mapped[dict | None] = mapped_column(JSON) # {"v": <actual_value>}
result_type: Mapped[str] = mapped_column(default="scalar")
# "scalar" | "list" | "dict" | "series" — for deserialization hint
# --- provenance ---
source: Mapped[str] = mapped_column(default="computed")
# "computed" | "imported:hyfd" | "imported:manual" | …

# --- provenance ---
source: Mapped[str] = mapped_column(default="computed")
# "computed" | "imported:hyfd" | "imported:manual" | …
return Base, DQResultModel, DataProfile
Loading