-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
112 lines (90 loc) · 3.66 KB
/
main.py
File metadata and controls
112 lines (90 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Entry point for the daily StockBot workflow."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import Dict, List, Sequence, Tuple
from data_fetcher import RawFinancialData, fetch_raw
from db import (
analysis_exists,
clear_tables,
get_connection,
get_day_analyses,
get_portfolio,
insert_analysis,
insert_day_analysis,
)
from evaluator import evaluate
from notifier import send_telegram_message
LOGGER = logging.getLogger(__name__)
DAY_TABLES: Dict[int, str] = {
1: "tuesday_analysis",
2: "wednesday_analysis",
3: "thursday_analysis",
4: "friday_analysis",
5: "saturday_analysis",
6: "sunday_analysis",
}
async def run_daily() -> None:
"""Execute the daily ingestion and notification routine."""
async with await get_connection() as conn:
now = datetime.now()
weekday = now.weekday()
portfolio = await get_portfolio(conn)
if weekday == 0:
tables: Sequence[str] = list(DAY_TABLES.values())
analysis_rows: List[Tuple[str, float, str, float]] = []
buy: List[str] = []
sell: List[str] = []
for position in portfolio:
day_data = await get_day_analyses(conn, position["ticker"], tables)
if not day_data:
continue
scores = [entry["score"] for entry in day_data if entry.get("score") is not None]
prices = [
entry["price_at_analysis"]
for entry in day_data
if entry.get("price_at_analysis") is not None
]
if not scores:
continue
avg_score = sum(scores) / len(scores)
avg_price = sum(prices) / len(prices) if prices else 0.0
decision = "BUY" if avg_score >= 70 else "HOLD" if avg_score >= 40 else "SELL"
analysis_rows.append((position["ticker"], avg_score, decision, avg_price))
if decision == "BUY":
buy.append(position["ticker"])
elif decision == "SELL":
sell.append(position["ticker"])
await insert_analysis(conn, analysis_rows)
await clear_tables(conn, tables)
lines = []
if buy:
lines.append("BUY: " + ", ".join(buy))
if sell:
lines.append("SELL: " + ", ".join(sell))
message = "\n".join(lines) if lines else "No data available."
send_telegram_message(message)
return
table = DAY_TABLES.get(weekday)
if table is None:
LOGGER.warning("No table configured for weekday %s", weekday)
return
if await analysis_exists(conn, now.date(), table):
LOGGER.info("Data already stored for %s", table)
return
daily_rows: List[Tuple[str, float, str, float]] = []
for position in portfolio:
raw: RawFinancialData | None = fetch_raw(position["ticker"])
if raw is None or raw.get("price") in (None, 0.0):
LOGGER.warning("Skipping %s due to missing or zero price", position["ticker"])
continue
score, decision = evaluate(raw)
price = float(raw.get("price") or 0.0)
daily_rows.append((position["ticker"], score, decision, price))
await insert_day_analysis(conn, table, daily_rows)
LOGGER.info("Daily data saved for %s", table)
send_telegram_message("Daily data collected.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(run_daily())