-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
317 lines (289 loc) · 12.6 KB
/
app.py
File metadata and controls
317 lines (289 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# app.py
import sqlite3
import fitz # PyMuPDF
import os
from flask import Flask, render_template, request, flash, redirect, url_for, g, abort
import pytesseract
from PIL import Image
import io # Per gestire i dati delle immagini in memoria
# Ora importiamo GOOGLE_CALENDAR_COLORS, non WASTE_TYPES_CONFIG
from config import GOOGLE_CALENDAR_COLORS
import google_calendar_manager
### PER UTENTI WINDOWS ###
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
# --- FUNZIONE HELPER PER IL PARSING OCR (IBRIDA) ---
def parse_pdf_with_ocr(pdf_bytes):
"""
Analizza un PDF basato su immagine usando OCR e un parser ibrido.
"""
matched_pairs = []
try:
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
full_text = ""
for page in pdf_document:
pix = page.get_pixmap(dpi=300)
img_bytes = pix.tobytes("png")
image = Image.open(io.BytesIO(img_bytes))
full_text += pytesseract.image_to_string(image, lang='ita')
if full_text:
known_exceptions = ["ditta Specializzata", "RAEE / Rivenditore"]
lines = full_text.splitlines()
for line in lines:
line = line.strip()
if not line:
continue
found_match = False
for exception in known_exceptions:
if line.endswith(exception):
name = line[:-len(exception)].strip()
destination = exception
matched_pairs.append((name, destination))
found_match = True
break
if found_match:
continue
if ' ' in line:
parts = line.rsplit(' ', 1)
if len(parts) == 2:
name = parts[0].strip()
destination = parts[1].strip()
if len(name) > 1 and len(destination) > 1:
matched_pairs.append((name, destination))
except Exception as e:
print(f"Errore durante il parsing OCR del PDF: {e}")
raise e
return matched_pairs
# --- Logica Principale dell'App ---
DATABASE = 'schedule.db'
app = Flask(__name__)
app.secret_key = 'una-chiave-segreta-a-caso-cambiami'
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
# --- Rotte Calendario ---
@app.route('/', methods=['GET', 'POST'])
def index():
db = get_db()
if request.method == 'POST':
if 'add_item' in request.form:
waste_type = request.form.get('waste_type')
day = request.form.get('day')
time_str = request.form.get('time')
if waste_type and day and time_str:
db.execute('INSERT INTO schedules (waste_type, day_of_week, time_of_day) VALUES (?, ?, ?)',
(waste_type, day, time_str))
db.commit()
flash(f"Aggiunto promemoria per {waste_type}", "success")
else:
flash("Tutti i campi sono obbligatori!", "danger")
elif 'remove_item' in request.form:
id_to_remove = request.form.get('remove_item')
db.execute('DELETE FROM schedules WHERE id = ?', (id_to_remove,))
db.commit()
flash(f"Promemoria rimosso.", "info")
return redirect(url_for('index'))
query = """
SELECT * FROM schedules
ORDER BY
CASE day_of_week
WHEN 'Lunedì' THEN 1 WHEN 'Martedì' THEN 2 WHEN 'Mercoledì' THEN 3 WHEN 'Giovedì' THEN 4
WHEN 'Venerdì' THEN 5 WHEN 'Sabato' THEN 6 WHEN 'Domenica' THEN 7
END, time_of_day """
schedule_from_db = db.execute(query).fetchall()
waste_types_from_db = db.execute('SELECT * FROM waste_types ORDER BY name').fetchall()
return render_template(
'index.html',
waste_types=waste_types_from_db,
days_of_week=["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"],
schedule=schedule_from_db
)
@app.route('/edit/<int:schedule_id>', methods=['GET', 'POST'])
def edit_schedule(schedule_id):
db = get_db()
schedule_item = db.execute('SELECT * FROM schedules WHERE id = ?', (schedule_id,)).fetchone()
if schedule_item is None:
abort(404)
if request.method == 'POST':
waste_type = request.form.get('waste_type')
day = request.form.get('day')
time_str = request.form.get('time')
if not waste_type or not day or not time_str:
flash('Tutti i campi sono obbligatori.', 'danger')
else:
db.execute('UPDATE schedules SET waste_type = ?, day_of_week = ?, time_of_day = ? WHERE id = ?',
(waste_type, day, time_str, schedule_id))
db.commit()
flash('Promemoria aggiornato con successo!', 'success')
return redirect(url_for('index'))
waste_types_from_db = db.execute('SELECT * FROM waste_types ORDER BY name').fetchall()
return render_template(
'edit_schedule.html',
schedule_item=schedule_item,
waste_types=waste_types_from_db,
days_of_week=["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"]
)
@app.route('/sync', methods=['POST'])
def sync():
db = get_db()
schedule_list = db.execute("""
SELECT s.*, wt.color_id
FROM schedules s
JOIN waste_types wt ON s.waste_type = wt.name
""").fetchall()
if not schedule_list:
flash("Nessun promemoria da sincronizzare.", "warning")
return redirect(url_for('index'))
try:
flash("Sincronizzazione in corso...", "info")
service = google_calendar_manager.authenticate_google_calendar()
deleted_count = google_calendar_manager.delete_all_waste_events(service)
flash(f"Cancellati {deleted_count} eventi precedenti dal calendario.", "info")
for item in schedule_list:
google_calendar_manager.create_recurring_event(
service, item['waste_type'], item['day_of_week'], item['time_of_day'], item['color_id']
)
flash("Sincronizzazione con Google Calendar completata!", "success")
except Exception as e:
print(f"Errore durante la sincronizzazione: {e}")
flash(f"Si è verificato un errore: {e}", "danger")
return redirect(url_for('index'))
# --- Rotte Tipi di Rifiuto ---
@app.route('/types')
def types():
db = get_db()
waste_types = db.execute('SELECT * FROM waste_types ORDER BY name').fetchall()
return render_template('types.html', waste_types=waste_types, colors=GOOGLE_CALENDAR_COLORS)
@app.route('/types/add', methods=['POST'])
def types_add():
name = request.form.get('name')
color_id = request.form.get('color_id')
db = get_db()
try:
db.execute('INSERT INTO waste_types (name, color_id) VALUES (?, ?)', (name, color_id))
db.commit()
flash('Nuovo tipo di rifiuto aggiunto.', 'success')
except sqlite3.IntegrityError:
flash(f"Il tipo di rifiuto '{name}' esiste già.", 'danger')
return redirect(url_for('types'))
@app.route('/types/edit/<int:type_id>', methods=['GET', 'POST'])
def types_edit(type_id):
db = get_db()
type_item = db.execute('SELECT * FROM waste_types WHERE id = ?', (type_id,)).fetchone()
if not type_item:
abort(404)
if request.method == 'POST':
name = request.form.get('name')
color_id = request.form.get('color_id')
try:
old_name = type_item['name']
if old_name != name:
db.execute('UPDATE schedules SET waste_type = ? WHERE waste_type = ?', (name, old_name))
db.execute('UPDATE waste_types SET name = ?, color_id = ? WHERE id = ?', (name, color_id, type_id))
db.commit()
flash('Tipo di rifiuto aggiornato.', 'success')
return redirect(url_for('types'))
except sqlite3.IntegrityError:
flash(f"Il tipo di rifiuto '{name}' esiste già.", 'danger')
return render_template('edit_type.html', type_item=type_item, colors=GOOGLE_CALENDAR_COLORS)
@app.route('/types/delete', methods=['POST'])
def types_delete():
type_id = request.form.get('id')
db = get_db()
type_item = db.execute('SELECT name FROM waste_types WHERE id = ?', (type_id,)).fetchone()
if type_item:
db.execute('DELETE FROM schedules WHERE waste_type = ?', (type_item['name'],))
db.execute('DELETE FROM waste_types WHERE id = ?', (type_id,))
db.commit()
flash(f"Tipo '{type_item['name']}' e i suoi promemoria sono stati eliminati.", 'success')
return redirect(url_for('types'))
# --- Rotte Dizionario Rifiuti ---
@app.route('/dictionary')
def dictionary():
db = get_db()
materials_list = db.execute('SELECT * FROM materials ORDER BY name COLLATE NOCASE ASC').fetchall()
return render_template('dictionary.html', materials=materials_list)
@app.route('/dictionary/add', methods=['POST'])
def dictionary_add():
name = request.form.get('name')
destination = request.form.get('destination')
if not name or not destination:
flash("Entrambi i campi sono obbligatori!", "danger")
return redirect(url_for('dictionary'))
db = get_db()
try:
db.execute('INSERT INTO materials (name, destination) VALUES (?, ?)', (name, destination))
db.commit()
flash(f"'{name}' aggiunto al dizionario.", "success")
except sqlite3.IntegrityError:
flash(f"Il materiale '{name}' esiste già.", "warning")
return redirect(url_for('dictionary'))
@app.route('/dictionary/delete', methods=['POST'])
def dictionary_delete():
material_id = request.form.get('id')
db = get_db()
db.execute('DELETE FROM materials WHERE id = ?', (material_id,))
db.commit()
flash("Voce eliminata.", "info")
return redirect(url_for('dictionary'))
@app.route('/dictionary/edit/<int:material_id>', methods=['GET', 'POST'])
def dictionary_edit(material_id):
db = get_db()
material_item = db.execute('SELECT * FROM materials WHERE id = ?', (material_id,)).fetchone()
if material_item is None:
abort(404)
if request.method == 'POST':
name = request.form.get('name')
destination = request.form.get('destination')
if not name or not destination:
flash('Tutti i campi sono obbligatori.', 'danger')
else:
try:
db.execute('UPDATE materials SET name = ?, destination = ? WHERE id = ?',
(name, destination, material_id))
db.commit()
flash('Voce del dizionario aggiornata.', 'success')
return redirect(url_for('dictionary'))
except sqlite3.IntegrityError:
flash(f"Il nome '{name}' esiste già.", 'danger')
return render_template('edit_material.html', material_item=material_item)
@app.route('/dictionary/upload_pdf', methods=['POST'])
def dictionary_upload_pdf():
if 'pdf_file' not in request.files:
flash('Nessun file selezionato.', 'danger')
return redirect(url_for('dictionary'))
file = request.files['pdf_file']
if file.filename == '':
flash('Nessun file selezionato.', 'danger')
return redirect(url_for('dictionary'))
if file and file.filename.lower().endswith('.pdf'):
try:
pdf_bytes = file.read()
parsed_data = parse_pdf_with_ocr(pdf_bytes)
if not parsed_data:
flash("L'OCR non ha trovato dati strutturati nel PDF.", "warning")
return redirect(url_for('dictionary'))
db = get_db()
new_items_count = 0
for name, destination in parsed_data:
cursor = db.execute('INSERT OR IGNORE INTO materials (name, destination) VALUES (?, ?)',
(name, destination))
if cursor.rowcount > 0:
new_items_count += 1
db.commit()
flash(f"Importazione OCR completata! Aggiunte {new_items_count} nuove voci.", "success")
except Exception as e:
flash(f"Errore durante l'elaborazione del PDF: {e}", "danger")
else:
flash('Formato file non valido. Caricare un .pdf', 'warning')
return redirect(url_for('dictionary'))
if __name__ == '__main__':
print("AVVIO IN MODALITA' TEST MANUALE. Per la produzione, usare run.py")
app.run(host='0.0.0.0', port=5000, debug=True)