-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathde_github_perp.py
More file actions
150 lines (126 loc) · 4.52 KB
/
de_github_perp.py
File metadata and controls
150 lines (126 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# -*- coding: utf-8 -*-
"""DE GitHub Perp.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/18yIrn1p-97ZXaSuMMxYv3f0cgHf1IT9j
"""
# =====================================
# 1. Setup: Mount Google Drive in Colab
# =====================================
from google.colab import drive
drive.mount('/content/drive')
# =====================================
# 2. Install Prefect and dependencies
# =====================================
!pip install -U prefect nltk
# =====================================
# 3. Imports
# =====================================
import os, re, sqlite3, datetime as dt
import pandas as pd
import nltk
from nltk.tokenize import sent_tokenize
from prefect import flow, task
# Download NLTK data
nltk.download("punkt")
nltk.download("punkt_tab")
# =====================================
# 4. Prefect Cloud setup - set API Key
# =====================================
os.environ["PREFECT_API_URL"] = "https://api.prefect.cloud/api/accounts/652a1f7c-ce2c-418c-a520-04803b0a2af5/workspaces/be9da84e-7dcb-4ec7-99e7-50d4788bf957"
os.environ["PREFECT_API_KEY"] = "pnu_vrhhznQzO4CnsamKVhKj8o4w2fBqyn17WMBq" # Add API key
# =====================================
# 5. Paths and config
# =====================================
INPUT_PATH = "/content/drive/My Drive/Uni Project/rules.csv"
TEXT_COL = "rule_text"
OUTPUT_DIR = "/content/drive/My Drive/Uni Project"
CSV_OUT = os.path.join(OUTPUT_DIR, "filtered_rules.csv")
DB_PATH = os.path.join(OUTPUT_DIR, "rules.db")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Obligation pattern regex
OBLIGATION_PATTERN = re.compile(
r'\b(must|shall|is required to)\b.*\b(submit|report|comply|provide)\b',
re.IGNORECASE
)
# =====================================
# 6. ETL Tasks
# =====================================
@task
def extract(path):
df = pd.read_csv(path)
assert TEXT_COL in df.columns, f"Column '{TEXT_COL}' not found. Found: {df.columns.tolist()}"
return df
@task
def clean(df):
# Drop rows missing rule_text
df_clean = df.dropna(subset=[TEXT_COL]).reset_index(drop=True)
return df_clean
@task
def tokenize_and_filter(df):
def extract_obligations(text):
return [s.strip() for s in sent_tokenize(str(text)) if OBLIGATION_PATTERN.search(s)]
df['obligations'] = df[TEXT_COL].apply(extract_obligations)
df_exploded = df.explode('obligations').reset_index(drop=True)
filtered = df_exploded.dropna(subset=['obligations']).drop_duplicates().reset_index(drop=True)
return filtered
@task
def save_outputs(filtered_df):
# Save CSV
filtered_df.to_csv(CSV_OUT, index=False)
# SQLite save with schema-safe approach
run_id = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S")
con = sqlite3.connect(DB_PATH)
with con:
# Create run_info table (persistent)
con.execute("""
CREATE TABLE IF NOT EXISTS run_info (
run_id TEXT PRIMARY KEY,
input_path TEXT,
text_col TEXT,
total_rules INTEGER,
matched_rows INTEGER,
created_at TEXT
);
""")
total_rules = filtered_df.shape[0]
matched_rows = filtered_df['obligations'].notnull().sum()
con.execute("""
INSERT OR REPLACE INTO run_info
(run_id, input_path, text_col, total_rules, matched_rows, created_at)
VALUES (?, ?, ?, ?, ?, ?);
""", (run_id, INPUT_PATH, TEXT_COL, total_rules, matched_rows,
dt.datetime.now().isoformat(timespec='seconds')))
# Add run_id to obligations output
filtered_df.insert(0, 'run_id', run_id)
# Always replace obligations table so schema matches
filtered_df.to_sql('obligations', con, if_exists='replace', index=False)
con.close()
print(f"Saved: {CSV_OUT} and {DB_PATH}")
return CSV_OUT, DB_PATH
# =====================================
# 7. Prefect Flow
# =====================================
@flow(name="GoogleDrive-Rules-ETL")
def etl_flow():
df = extract(INPUT_PATH)
cleaned = clean(df)
filtered = tokenize_and_filter(cleaned)
csv_out, db_out = save_outputs(filtered)
print(f"""
ETL complete! Outputs saved to:
CSV: {csv_out}
DB: {db_out}
Filtered example:
{filtered.head()}
""")
return csv_out, db_out
# =====================================
# 8. Run the flow
# =====================================
etl_flow()
# OPTIONAL – to schedule daily in Prefect Cloud:
# etl_flow.serve(
# name="daily-google-drive-etl",
# cron="0 0 * * *", # Run daily at midnight
# )