-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgresql_decorators_example.py
More file actions
141 lines (115 loc) · 3.86 KB
/
postgresql_decorators_example.py
File metadata and controls
141 lines (115 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Example: PostgreSQL database validation with decorators
"""
from db_expectations import DatabaseValidator, validate_before, validate_after
from db_expectations.suites import ExpectationSuites
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# PostgreSQL connection string
# Format: postgresql://user:password@host:port/database
connection_string = os.getenv(
"POSTGRES_CONNECTION",
"postgresql://postgres:password@localhost:5432/testdb"
)
# Initialize validator
validator = DatabaseValidator(connection_string)
print("✓ Connected to PostgreSQL")
# Example function with pre-validation decorator
@validate_before(
validator,
table_name="orders",
expectations=ExpectationSuites.row_count_check(min_rows=0, max_rows=10000)
)
def insert_order(order_id, customer_id, amount):
"""Insert order with pre-validation to ensure table isn't at capacity."""
query = f"""
INSERT INTO orders (order_id, customer_id, amount, status)
VALUES ({order_id}, {customer_id}, {amount}, 'pending')
"""
# Execute insert (simplified for example)
print(f"Inserting order {order_id}")
return order_id
# Example function with post-validation decorator
@validate_after(
validator,
table_name="customers",
expectations=ExpectationSuites.unique_checks(["email"])
)
def update_customer_email(customer_id, new_email):
"""Update customer email with post-validation to ensure uniqueness."""
query = f"""
UPDATE customers
SET email = '{new_email}'
WHERE customer_id = {customer_id}
"""
# Execute update (simplified for example)
print(f"Updated customer {customer_id} email to {new_email}")
return True
# Example: Complex validation suite
print("\n=== Complex Validation Suite ===")
# Combine multiple validation types
expectations = ExpectationSuites.combine(
# Null checks for required fields
ExpectationSuites.null_checks([
"order_id",
"customer_id",
"amount",
"status",
"created_at"
]),
# Type checks
ExpectationSuites.type_checks({
"order_id": "int",
"customer_id": "int",
"amount": "float",
"status": "str",
"created_at": "datetime"
}),
# Range checks
ExpectationSuites.range_checks({
"amount": {"min": 0.01, "max": 1000000.00}
}),
# Set membership for status
ExpectationSuites.set_membership_checks({
"status": ["pending", "processing", "shipped", "delivered", "cancelled"]
}),
# Row count
ExpectationSuites.row_count_check(min_rows=0, max_rows=100000)
)
results = validator.validate_table(
table_name="orders",
suite_name="orders_comprehensive_suite",
expectations=expectations
)
print(f"Comprehensive validation success: {results.success}")
# Example: Validate data freshness
print("\n=== Data Freshness Validation ===")
freshness_expectations = ExpectationSuites.data_freshness_check(
timestamp_column="created_at",
max_age_hours=24
)
results_freshness = validator.validate_query(
query="SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '24 HOURS'",
asset_name="recent_orders",
suite_name="freshness_suite",
expectations=freshness_expectations
)
print(f"Freshness validation success: {results_freshness.success}")
# Example: Completeness check (allow some nulls)
print("\n=== Completeness Validation ===")
# Expect at least 95% of rows to have non-null values
completeness_expectations = ExpectationSuites.completeness_check(
columns=["email", "phone"],
threshold=0.95
)
results_completeness = validator.validate_table(
table_name="customers",
suite_name="completeness_suite",
expectations=completeness_expectations
)
print(f"Completeness validation success: {results_completeness.success}")
# Cleanup
validator.close()
print("\n✓ PostgreSQL validation complete")