-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
77 lines (68 loc) · 2.7 KB
/
database.py
File metadata and controls
77 lines (68 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# =============================================================================
# database.py — PhantomEye v1.2.1
# Red Parrot Accounting Ltd
#
# Database initialisation.
#
# FIX v1.2.1:
# - init_database: connection now closed in a finally block — was leaked
# on any exception between sqlite3.connect() and conn.close().
# =============================================================================
import sqlite3
from config import DB_PATH
from logger import log
def init_database() -> None:
"""Create all tables if they do not already exist."""
# FIX: wrap in try/finally so the connection is always closed even if
# a CREATE TABLE statement raises (e.g. DB path not writable).
# Previously any exception between sqlite3.connect() and conn.close()
# would leave the file handle open until the GC collected it.
conn = sqlite3.connect(DB_PATH)
try:
cur = conn.cursor()
# IOC storage — all malicious IPs and domains
cur.execute("""
CREATE TABLE IF NOT EXISTS iocs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT,
value TEXT,
threat_type TEXT,
source TEXT,
first_added TEXT,
last_updated TEXT,
UNIQUE(type, value)
)
""")
# Feed status — track when each feed was last downloaded
cur.execute("""
CREATE TABLE IF NOT EXISTS feed_status (
feed_name TEXT PRIMARY KEY,
label TEXT,
last_updated TEXT,
ioc_count INTEGER,
status TEXT
)
""")
# Alert history — every threat hit ever raised
cur.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
severity TEXT,
alert_type TEXT,
ioc_value TEXT,
ioc_type TEXT,
source_feed TEXT,
context TEXT,
details TEXT
)
""")
# Indexes for frequently-queried columns
cur.execute("CREATE INDEX IF NOT EXISTS idx_iocs_value ON iocs(value)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_iocs_type_value ON iocs(type, value)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ioc_value ON alerts(ioc_value)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_alerts_timestamp ON alerts(timestamp)")
conn.commit()
finally:
conn.close()
log.info("Database ready: %s", DB_PATH)