This repository was archived by the owner on Apr 26, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
95 lines (71 loc) · 3.34 KB
/
utils.py
File metadata and controls
95 lines (71 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from pydantic import BaseModel
from datetime import datetime
import sqlite3
import cat.plugins.asklite.vars as vars
class DatabaseExecutionError(Exception):
"""Custom exception for handling database execution errors."""
pass
def update_db_structure():
"""Updates the db_structure and dynamically generates Pydantic models with only insertable fields and foreign key detection."""
cursor = vars.conn.cursor()
# Fetch the structure of all tables in the database (excluding sqlite_sequence)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence';")
tables = cursor.fetchall()
vars.db_structure = ""
vars.table_class_map.clear() # Reset class mappings
table_names = []
for table in tables:
table_name = table[0]
table_names.append(table_name)
# Get table columns
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
# Get foreign keys
cursor.execute(f"PRAGMA foreign_key_list({table_name});")
foreign_keys = cursor.fetchall()
fk_map = {fk[3]: fk[2] for fk in foreign_keys} # {column_name: referenced_table}
# Store the structure
vars.db_structure += f"{table_name} table:\n"
attributes = {
"__annotations__": {"table_name": str} # Ensure table_name is the first field
}
for col in columns:
col_name, col_type, notnull, default, pk = col[1], col[2], col[3], col[4], col[5]
# Exclude auto-incrementing primary key fields
if pk == 1 and "INT" in col_type.upper():
continue
# Check if column is a foreign key
relation_info = f" -> {fk_map[col_name]}" if col_name in fk_map else ""
vars.db_structure += f"- {col_name} ({col_type}){relation_info}\n"
# Convert SQLite types to Python types
python_type = (
int if "INT" in col_type.upper() else
float if "REAL" in col_type.upper() else
str # Default to string
)
attributes["__annotations__"][col_name] = python_type
vars.db_structure += "\n\n"
# Create a Pydantic model dynamically
model = type(
table_name.capitalize(), # Class name
(BaseModel,), # Inheriting from Pydantic's BaseModel
attributes
)
# Store model in dictionary
vars.table_class_map[table_name] = model
vars.table_names_str = f"Table names are: {', '.join(table_names)}"
vars.db_structure_last_update_date = datetime.now()
def execute_multiple_statements(query):
# Split the query at ';' to separate multiple statements
statements = query.split(';')
# Remove any empty strings resulting from split (in case of trailing semicolons)
statements = [stmt.strip() for stmt in statements if stmt.strip()]
cursor = vars.conn.cursor()
# Execute each statement separately
for statement in statements:
try:
cursor.execute(statement)
vars.conn.commit()
except sqlite3.Error as e:
# Raise a custom exception with a detailed error message
raise DatabaseExecutionError(f"Error executing statement: {statement}\nError: {e}")