-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline_manager.py
More file actions
346 lines (299 loc) · 13.5 KB
/
pipeline_manager.py
File metadata and controls
346 lines (299 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python3
"""
DocumentProcessor - main class for document processing.
"""
from pathlib import Path
from typing import Callable, Iterable, List, Optional, Sequence, Tuple
import utils as U
import config as cfg
from pdf_processor import PDFProcessor
from instapaper_processor import InstapaperProcessor
from podcast_processor import PodcastProcessor
from image_processor import ImageProcessor
from markdown_processor import MarkdownProcessor
from path_utils import unique_path
PIPELINE_STEPS = (
("tweets", "process_tweets_pipeline"),
("podcasts", "process_podcasts"),
("posts", "process_instapaper_posts"),
("pdfs", "process_pdfs"),
("images", "process_images"),
("md", "process_markdown"),
)
TARGET_HANDLERS = {name: method for name, method in PIPELINE_STEPS}
PIPELINE_TARGETS = tuple(name for name, _ in PIPELINE_STEPS)
from utils.tweet_to_markdown import fetch_tweet_thread_markdown
from utils.x_likes_fetcher import LikeTweet, fetch_like_items_with_state
class DocumentProcessor:
"""Main document processor with modular, configurable logic."""
def __init__(self, base_dir: Path, year: int):
self.base_dir = Path(base_dir)
self.year = year
self.incoming = self.base_dir / "Incoming"
self.posts_dest = self._year_dir("Posts")
self.pdfs_dest = self._year_dir("Pdfs")
self.podcasts_dest = self._year_dir("Podcasts")
self.images_dest = self._year_dir("Images")
self.tweets_dest = self._year_dir("Tweets")
self.processed_history = self.incoming / "processed_history.txt"
self.tweets_processed = self.incoming / "tweets_processed.txt"
self.tweets_failed = self.incoming / "tweets_failed.txt"
self.pdf_processor = PDFProcessor(self.incoming, self.pdfs_dest)
self.instapaper_processor = InstapaperProcessor(self.incoming, self.posts_dest)
self.podcast_processor = PodcastProcessor(self.incoming, self.podcasts_dest)
self.image_processor = ImageProcessor(self.incoming, self.images_dest)
self.markdown_processor = MarkdownProcessor(self.incoming, self.posts_dest)
self.tweet_processor = MarkdownProcessor(self.incoming, self.tweets_dest)
self._history: List[Path] = []
def _year_dir(self, kind: str) -> Path:
"""Build the yearly path for the given kind."""
return self.base_dir / kind / f"{kind} {self.year}"
def _run_and_remember(self, fn: Callable[[], List[Path]]) -> List[Path]:
"""Run a processing function and record its results."""
paths = fn()
self._remember(paths)
return paths
def process_tweet_urls(self) -> List[Path]:
"""Fetch recent likes from X and generate Markdown in Incoming/."""
failed_urls = self._load_failed_urls()
processed_urls = self._load_processed_urls()
processed_set = set(processed_urls)
pending_failed = {url: None for url in failed_urls if url not in processed_set}
likes_error = False
stop_found = False
total_articles = 0
last_processed = self._last_processed_tweet_url()
try:
likes, stop_found, total_articles = self._fetch_like_items(last_processed=last_processed)
except Exception as exc:
print(f"🐦 Could not read X likes: {exc}")
likes = []
likes_error = True
if likes and last_processed and not stop_found:
anchor_url = self._first_processed_like_url(likes, processed_set)
if anchor_url:
processed_urls = self._promote_processed_url(processed_urls, anchor_url)
processed_set = set(processed_urls)
else:
print(
"⚠️ Last processed URL not found in likes; "
f"check the TWEET_LIKES_MAX limit (visible articles: {total_articles})."
)
if not likes and not pending_failed:
self._write_failed_urls(list(pending_failed.keys()))
if not likes_error:
print("🐦 No new tweets in your likes")
return []
fresh_likes = [like for like in likes if like.url not in processed_set]
fresh_url_set = {like.url for like in fresh_likes}
retry_urls = [url for url in pending_failed if url not in fresh_url_set]
if not fresh_likes and not retry_urls:
self._write_failed_urls(list(pending_failed.keys()))
print("🐦 No new likes pending (everything is already processed).")
return []
if retry_urls:
print(f"🐦 Retrying {len(retry_urls)} failed tweet(s).")
generated: List[Path] = []
written_fresh_urls: List[str] = []
written_retry_urls: List[str] = []
queue: List[LikeTweet] = list(fresh_likes) + [LikeTweet(url=url) for url in retry_urls]
for like in queue:
try:
markdown, filename = fetch_tweet_thread_markdown(
like.url,
# Use storage_state to avoid X's login wall.
storage_state=cfg.TWEET_LIKES_STATE,
like_author_handle=like.author_handle,
like_time_text=like.time_text,
like_time_datetime=like.time_datetime,
)
except Exception as exc:
print(f"❌ Error processing {like.url}: {exc}")
pending_failed.setdefault(like.url, None)
continue
destination = self._unique_destination(self.incoming / filename)
destination.write_text(markdown, encoding="utf-8")
generated.append(destination)
if like.url in fresh_url_set:
written_fresh_urls.append(like.url)
else:
written_retry_urls.append(like.url)
pending_failed.pop(like.url, None)
print(f"🐦 Tweet saved as {destination.name}")
if written_fresh_urls or written_retry_urls:
self._record_processed_urls(
fresh_urls=written_fresh_urls,
retry_urls=written_retry_urls,
)
self._write_failed_urls(list(pending_failed.keys()))
return generated
def _unique_destination(self, target: Path) -> Path:
"""Generate a unique name to avoid overwriting existing files."""
return unique_path(target)
def _fetch_like_items(
self,
*,
last_processed: str | None,
) -> Tuple[List[LikeTweet], bool, int]:
if not cfg.TWEET_LIKES_STATE:
raise RuntimeError("Configure TWEET_LIKES_STATE with the storage_state exported from X.")
items, stop_found, total_articles = fetch_like_items_with_state(
cfg.TWEET_LIKES_STATE,
likes_url=cfg.TWEET_LIKES_URL,
max_tweets=cfg.TWEET_LIKES_MAX,
stop_at_url=last_processed,
headless=True,
)
return items, stop_found, total_articles
def _last_processed_tweet_url(self) -> Optional[str]:
lines = self._load_processed_urls()
if not lines:
return None
return lines[0]
def _load_processed_urls(self) -> List[str]:
path = self.tweets_processed
if not path.exists():
return []
lines = [line.strip() for line in path.read_text(encoding="utf-8").splitlines()]
return [line for line in lines if line]
def _load_failed_urls(self) -> List[str]:
path = self.tweets_failed
if not path.exists():
return []
lines = [line.strip() for line in path.read_text(encoding="utf-8").splitlines()]
return [line for line in lines if line]
def _write_failed_urls(self, urls: List[str]) -> None:
path = self.tweets_failed
if not urls:
if path.exists():
path.unlink()
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(urls) + "\n", encoding="utf-8")
def _write_processed_urls(self, urls: List[str]) -> None:
path = self.tweets_processed
if not urls:
if path.exists():
path.unlink()
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(urls) + "\n", encoding="utf-8")
@staticmethod
def _dedupe_urls(urls: Sequence[str]) -> List[str]:
seen: set[str] = set()
ordered: List[str] = []
for url in urls:
if not url or url in seen:
continue
seen.add(url)
ordered.append(url)
return ordered
def _record_processed_urls(self, *, fresh_urls: List[str], retry_urls: List[str]) -> None:
fresh = self._dedupe_urls(fresh_urls)
retries = self._dedupe_urls(retry_urls)
if not fresh and not retries:
return
existing = self._load_processed_urls()
fresh_set = set(fresh)
retry_set = set(retries)
middle = [url for url in existing if url not in fresh_set and url not in retry_set]
ordered = [*fresh, *middle, *retries]
self._write_processed_urls(ordered)
def _promote_processed_url(self, processed_urls: Sequence[str], anchor_url: str) -> List[str]:
if not processed_urls:
return []
if processed_urls[0] == anchor_url:
return list(processed_urls)
if anchor_url not in processed_urls:
return list(processed_urls)
reordered = [anchor_url, *[url for url in processed_urls if url != anchor_url]]
self._write_processed_urls(reordered)
return reordered
@staticmethod
def _first_processed_like_url(likes: Sequence[LikeTweet], processed_set: set[str]) -> str | None:
for like in likes:
if like.url in processed_set:
return like.url
return None
def process_podcasts(self) -> List[Path]:
"""Process podcast files with the unified processor."""
# Use the unified processor for the whole podcasts pipeline.
return self._run_and_remember(self.podcast_processor.process_podcasts)
def process_instapaper_posts(self) -> List[Path]:
"""Process Instapaper web posts with the unified pipeline."""
# Use the unified processor for the whole Instapaper pipeline.
moved_posts = self.instapaper_processor.process_instapaper_posts()
self._remember(moved_posts)
return moved_posts
def process_pdfs(self) -> List[Path]:
"""Process PDFs using the specialized processor."""
return self._run_and_remember(self.pdf_processor.process_pdfs)
def process_images(self) -> List[Path]:
"""Process images by moving them and generating the yearly gallery."""
return self._run_and_remember(self.image_processor.process_images)
def process_markdown(self) -> List[Path]:
"""Process generic Markdown files."""
return self._run_and_remember(self.markdown_processor.process_markdown)
def process_tweets_pipeline(self, *, log_empty_conversion: bool = True) -> List[Path]:
"""Process the tweet queue and move results to the yearly Tweets folder."""
generated = self.process_tweet_urls()
tweet_markdown = self._merge_paths(self._list_tweet_markdown(), generated)
return self._process_tweet_markdown_subset(tweet_markdown, log_empty=log_empty_conversion)
def process_targets(self, targets: Iterable[str], *, log_empty_tweets: bool = True) -> bool:
"""Run a subset of the pipeline for the given targets."""
try:
for target in targets:
handler_name = TARGET_HANDLERS[target]
handler = getattr(self, handler_name)
if target == "tweets":
handler(log_empty_conversion=log_empty_tweets)
else:
handler()
self.register_all_files()
print("Pipeline completed ✅")
return True
except Exception as e:
print(f"❌ Pipeline error: {e}")
return False
def _process_tweet_markdown_subset(
self,
markdown_files: Iterable[Path],
*,
log_empty: bool = True,
) -> List[Path]:
files = [Path(path) for path in markdown_files if Path(path).exists()]
if not files:
if log_empty:
print("🐦 No new tweets to convert to HTML")
return []
return self._run_and_remember(lambda: self.tweet_processor.process_tweet_markdown_subset(files))
def register_all_files(self) -> None:
"""Register all processed files in history."""
if self._history:
U.register_paths(
self._history,
base_dir=self.base_dir,
historial_path=self.processed_history,
)
self._history = []
def process_all(self) -> bool:
"""Run the full pipeline."""
return self.process_targets(PIPELINE_TARGETS, log_empty_tweets=False)
def _remember(self, paths: List[Path]) -> None:
self._history.extend(paths)
@staticmethod
def _merge_paths(primary: Iterable[Path], secondary: Iterable[Path]) -> List[Path]:
seen: set[Path] = set()
merged: List[Path] = []
for path in list(primary) + list(secondary):
path = Path(path)
if path in seen:
continue
seen.add(path)
merged.append(path)
return merged
def _list_tweet_markdown(self) -> List[Path]:
return [
path for path in self.incoming.rglob("*.md")
if self.tweet_processor.is_tweet_markdown(path)
]