-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
242 lines (197 loc) · 9.63 KB
/
main.py
File metadata and controls
242 lines (197 loc) · 9.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import os
import yaml
import logging
import json
import time
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import sys
# Global variable for default host IP
DEFAULT_HOST_IP = '1.1.1.1'
# ... rest of the imports and code ...
# Configure logging.,
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_labels(labels, process_traefik):
global DEFAULT_HOST_IP
processed_labels = {'pihole.dns': [], 'traefik.dns': []}
host_ip = labels.get('pihole.hostip', DEFAULT_HOST_IP)
for key, value in (labels.items() if isinstance(labels, dict) else
(item.split('=', 1) for item in labels if '=' in item)):
if key == 'pihole.dns':
# Handle multiple domain names in pihole.dns, stripping quotes
value = value.strip('\'" ')
domains = [domain.strip('` ,') for domain in value.split(',') if domain.strip()]
processed_labels['pihole.dns'].extend(domains)
elif process_traefik and (key.startswith('traefik.http.routers.') or key.startswith('traefik.https.routers.')) and key.endswith('.rule'):
# Extract domains from Traefik rule with possible multiple Host directives
host_directives = value.split('||')
for directive in host_directives:
if 'Host(' in directive:
directive = directive.split('Host(')[-1].rstrip(')\'" ')
domains = [domain.strip('` ,') for domain in directive.split(',') if domain.strip()]
processed_labels['traefik.dns'].extend(domains)
processed_labels['pihole.hostip'] = host_ip
# Log the processed labels for debugging
logging.info(f"Processed labels: {processed_labels}")
return processed_labels
def read_docker_compose_labels(file_path, process_traefik):
try:
with open(file_path, 'r') as file:
compose_data = yaml.safe_load(file)
services = compose_data.get('services', {})
labels_data = {}
for service_name, service in services.items():
labels = service.get('labels', {})
if isinstance(labels, list):
# Convert list of 'key=value' to dictionary
labels = dict(label.split('=', 1) for label in labels if '=' in label)
labels_data[service_name] = process_labels(labels, process_traefik)
return labels_data
except Exception as e:
logging.error(f"Error reading Docker Compose file: {e}")
return {}
def read_intermediary_file(intermediary_path):
try:
with open(intermediary_path, 'r') as file:
return json.load(file)
except FileNotFoundError:
return {}
except Exception as e:
logging.error(f"Error reading intermediary file: {e}")
return {}
def update_intermediary_file(intermediary_path, current_data, previous_data):
updated = False
for container, labels in current_data.items():
# Combine pihole.dns and traefik.dns entries
dns_names = labels.get('pihole.dns', []) + labels.get('traefik.dns', [])
host_ip = labels.get('pihole.hostip', 'unknown')
new_pairs = [f"{host_ip} {dns}" for dns in dns_names]
old_pairs = previous_data.get(container, {}).get('pairs', ['unknown unknown'])
if set(new_pairs) != set(old_pairs):
previous_data[container] = {'pairs': new_pairs, 'old_pairs': old_pairs}
updated = True
if updated:
try:
with open(intermediary_path, 'w') as file:
json.dump(previous_data, file)
logging.info(f"Updated intermediary file {intermediary_path}")
except Exception as e:
logging.error(f"Error updating intermediary file: {e}")
def update_output_file(output_path, intermediary_path):
try:
with open(intermediary_path, 'r') as file:
intermediary_data = json.load(file)
# Read existing lines from the output file
with open(output_path, 'r') as file:
existing_lines = set(file.read().splitlines())
# Update existing_lines based on intermediary data
for container, data in intermediary_data.items():
new_pairs = set(data.get('pairs', []))
old_pairs = set(data.get('old_pairs', []))
# Remove old pairs
existing_lines.difference_update(old_pairs)
# Add new pairs, checking for conflicts
for new_pair in new_pairs:
new_dns = new_pair.split(' ')[1]
conflicting_entry = any(line.split(' ')[1] == new_dns and line not in new_pairs for line in existing_lines)
if conflicting_entry:
logging.warning(f"Conflicting entry found for DNS {new_dns}. Not updating.")
else:
existing_lines.add(new_pair)
# Write updated lines to the output file
with open(output_path, 'w') as file:
for line in sorted(existing_lines):
file.write(f"{line}\n")
logging.info(f"Successfully updated {output_path}")
except FileNotFoundError:
# Create the file if it doesn't exist and write the new data
with open(output_path, 'w') as file:
for line in sorted(existing_lines):
file.write(f"{line}\n")
logging.info(f"Created and populated new {output_path}")
except Exception as e:
logging.error(f"Error updating file: {e}")
def watch_for_changes(filename, interval=5):
last_modified = None
while True:
try:
# Check last modified time
current_modified = os.path.getmtime(filename)
if last_modified is None or current_modified > last_modified:
last_modified = current_modified
yield True
else:
yield False
except FileNotFoundError:
logging.error(f"File {filename} not found.")
yield False
time.sleep(interval)
def process_files(compose_file, intermediary_file, output_file, process_traefik):
current_labels = read_docker_compose_labels(compose_file, process_traefik)
previous_labels = read_intermediary_file(intermediary_file)
update_intermediary_file(intermediary_file, current_labels, previous_labels)
update_output_file(output_file, intermediary_file)
logging.info("Processing completed.")
class DockerComposeFileEventHandler(FileSystemEventHandler):
def __init__(self, compose_file, intermediary_file, output_file, process_traefik):
self.compose_file = compose_file
self.intermediary_file = intermediary_file
self.output_file = output_file
self.process_traefik = process_traefik
def on_any_event(self, event):
# React only to file creation/modification in the directory of the Docker Compose file
if event.is_directory or not event.event_type in ['created', 'modified']:
return
file_dir = os.path.dirname(event.src_path)
if file_dir == os.path.dirname(self.compose_file):
logging.info(f"Change detected in the directory of {self.compose_file}, reprocessing...")
process_files(self.compose_file, self.intermediary_file, self.output_file, self.process_traefik)
def timed_run(interval, compose_file, intermediary_file, output_file, process_traefik):
while True:
try:
logging.info("Starting timed processing cycle.")
process_files(compose_file, intermediary_file, output_file, process_traefik)
logging.info(f"Sleeping for {interval} seconds.")
time.sleep(interval) # Corrected this line
except Exception as e:
logging.error(f"Error in timed run: {e}")
# Optional: Decide if you want to break the loop in case of an error
# break
def manual_execution(compose_file, intermediary_file, output_file, process_traefik):
logging.info("Manual execution started.")
process_files(compose_file, intermediary_file, output_file, process_traefik)
logging.info("Manual execution completed.")
def main():
global DEFAULT_HOST_IP
DEFAULT_HOST_IP = os.getenv('DEFAULT_HOST_IP', 'unknown')
# ... rest of the main function ...
compose_file = '/compose/docker-compose.yml'
output_file = '/output/custom.list'
intermediary_file = '/data/tempdns.json'
watch_mode = os.getenv('WATCH_MODE', 'False').lower() == 'true'
timed_mode = os.getenv('TIMED_MODE', 'False').lower() == 'true'
poll_interval = int(os.getenv('POLL_INTERVAL', 30))
process_traefik = os.getenv('PROCESS_TRAEFIK', 'False').lower() == 'true'
manual_mode = os.getenv('MANUAL_MODE', 'False').lower() == 'true'
logging.info(f"Watch Mode: {watch_mode}, Timed Mode: {timed_mode}, Poll Interval: {poll_interval} seconds, Process Traefik: {process_traefik}")
if manual_mode or 'manual' in sys.argv:
manual_execution(compose_file, intermediary_file, output_file, process_traefik)
if watch_mode:
event_handler = DockerComposeFileEventHandler(compose_file, intermediary_file, output_file, process_traefik)
observer = Observer()
observer.schedule(event_handler, path=os.path.dirname(compose_file), recursive=False)
observer.start()
if timed_mode:
timed_thread = threading.Thread(target=timed_run, args=(poll_interval, compose_file, intermediary_file, output_file, process_traefik))
timed_thread.daemon = True
timed_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
if watch_mode:
observer.stop()
observer.join()
if __name__ == "__main__":
main()