-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalyze.py
More file actions
665 lines (568 loc) · 26.2 KB
/
analyze.py
File metadata and controls
665 lines (568 loc) · 26.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
#!/usr/bin/env python3
"""
x-engage: Engagement Analyzer + Discord Reporter
Reads x-monitor tweets_window.json (24h sliding window), scores posts,
runs GPT-4o-mini analysis on top-3 performers, generates content ideas
for @desearch_ai, and posts a digest to Discord #x-alerts.
Usage:
python3 analyze.py # Full run: analyze + post to Discord
python3 analyze.py --dry-run # Print JSON to stdout, no Discord post
"""
import argparse
import fcntl
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import requests
from dotenv import load_dotenv
# ─────────────────────────────────────────────
# Config & Env
# ─────────────────────────────────────────────
load_dotenv()
CONFIG_PATH = os.environ.get("X_ENGAGE_CONFIG", Path(__file__).parent / "config.json")
PENDING_ACTIONS_LOCK_NAME = ".pending_actions.lock"
def atomic_write_json(path: Path, data) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", dir=path.parent, prefix=f".{path.name}.", suffix=".tmp", delete=False, encoding="utf-8") as tmp:
json.dump(data, tmp, indent=2, ensure_ascii=False)
tmp.write("\n")
tmp_path = Path(tmp.name)
tmp_path.replace(path)
def acquire_queue_lock(queue_path: Path):
lock_path = queue_path.parent / PENDING_ACTIONS_LOCK_NAME
lock_path.parent.mkdir(parents=True, exist_ok=True)
handle = lock_path.open("w")
try:
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
handle.close()
raise RuntimeError(f"pending_actions queue busy, lock held at {lock_path}")
handle.write(str(os.getpid()))
handle.flush()
return handle
def _get_discord_token() -> str:
"""
Load Discord bot token with fallback chain:
1. DISCORD_BOT_TOKEN env var (or .env file via dotenv)
2. ~/.openclaw/openclaw.json (same source as post-to-discord.cjs)
"""
token = os.environ.get("DISCORD_BOT_TOKEN", "")
if token:
return token
try:
openclaw_cfg_path = Path.home() / ".openclaw" / "openclaw.json"
cfg = json.loads(openclaw_cfg_path.read_text())
token = cfg.get("channels", {}).get("discord", {}).get("token", "")
if token:
print("[discord] Token loaded from ~/.openclaw/openclaw.json", file=sys.stderr)
except Exception as e:
print(f"[discord] Could not read openclaw.json: {e}", file=sys.stderr)
return token
def load_config() -> dict:
with open(CONFIG_PATH) as f:
return json.load(f)
def get_accounts(cfg: dict) -> list[dict]:
"""Return all configured X accounts. Replaces _get_active_account()."""
return cfg.get("x_accounts", [])
# ─────────────────────────────────────────────
# Scoring
# ─────────────────────────────────────────────
def score_tweet(tweet: dict, weights: dict) -> float:
"""
score = likes*3 + retweets*5 + replies*2 + views*0.01 + quotes*4 + bookmarks*2
All fields default to 0.0 if None or missing.
"""
def _val(key: str) -> float:
v = tweet.get(key)
return float(v) if v is not None else 0.0
return (
_val("like_count") * weights.get("likes", 3)
+ _val("retweet_count") * weights.get("retweets", 5)
+ _val("reply_count") * weights.get("replies", 2)
+ _val("view_count") * weights.get("views", 0.01)
+ _val("quote_count") * weights.get("quotes", 4)
+ _val("bookmark_count") * weights.get("bookmarks", 2)
)
def get_top_tweets(tweets: list[dict], weights: dict, top_n: int) -> list[dict]:
scored = []
for t in tweets:
s = score_tweet(t, weights)
scored.append({**t, "_score": round(s, 2)})
scored.sort(key=lambda x: x["_score"], reverse=True)
# Deduplicate by tweet id
seen: set[str] = set()
deduped: list[dict] = []
for t in scored:
tid = t.get("id")
if tid not in seen:
seen.add(tid)
deduped.append(t)
return deduped[:top_n]
# ─────────────────────────────────────────────
# Queue Generation
# ─────────────────────────────────────────────
def build_queue_items(tweets: list[dict], accounts: list[dict]) -> list[dict]:
"""
Build richer queue items for each tweet × account pair.
Returns one item per (tweet, account) combination.
"""
now = datetime.now(timezone.utc).isoformat()
items: list[dict] = []
for tweet in tweets:
for acct in accounts:
items.append({
"tweet_id": tweet.get("id", ""),
"tweet_url": tweet.get("url", ""),
"tweet_text": tweet.get("text", "")[:280],
"author": _username(tweet),
"score": tweet.get("_score", 0),
"action": "pending",
"account_id": acct["id"],
"account_label": acct.get("label", f"@{acct['id']}"),
"lane": acct.get("lane", "unknown"),
"action_types": acct.get("action_types", ["retweet", "quote"]),
"source": "x-engage-analyzer",
"category": tweet.get("_monitor_category", ""),
"timestamp": now,
})
return items
# ─────────────────────────────────────────────
# LLM Analysis
# ─────────────────────────────────────────────
ANALYSIS_SYSTEM = """\
You are an expert social media analyst specialising in X/Twitter performance and tech/AI content.
Analyse the given tweet and respond ONLY with a valid JSON object — no markdown, no prose.
"""
ANALYSIS_USER_TEMPLATE = """\
Analyse this tweet for engagement patterns:
Author: @{username}
Text: {text}
Engagement: {likes} likes | {rts} retweets | {replies} replies | {views} views | {quotes} quotes | {bookmarks} bookmarks
Category: {category}
Engagement Score: {score}
Return exactly this JSON shape (all fields required):
{{
"hook_type": "<question|data|story|controversy|announcement|list|other>",
"format": "<single_tweet|thread|media|quote_tweet|other>",
"emotional_trigger": "<FOMO|curiosity|identity|social_proof|humor|inspiration|fear|other>",
"why_it_performed": "<1-2 sentence explanation of why this post did well>",
"audience_fit_score": <1-10 integer>,
"key_elements": ["<element1>", "<element2>"]
}}
"""
CONTENT_IDEAS_SYSTEM = """\
You are a content strategist for @desearch_ai, an AI-powered search & scraping API on the Bittensor SN22 subnet.
Based on the patterns in the top-performing tweets provided, generate 3 concrete content ideas.
Respond ONLY with a valid JSON array of 3 objects — no markdown, no prose.
"""
CONTENT_IDEAS_USER_TEMPLATE = """\
Here are the top-performing tweet patterns observed:
{patterns_summary}
Generate 3 content ideas for @desearch_ai that leverage these patterns.
Each idea should be specific, actionable, and tailored to the Desearch brand (AI search API, Bittensor SN22, developer audience).
Return exactly this JSON shape:
[
{{
"title": "<catchy tweet opener / hook>",
"format": "<single_tweet|thread|media|announcement>",
"hook_type": "<question|data|story|controversy|announcement|list>",
"angle": "<1-2 sentence description of the content angle and why it will perform>",
"example_opener": "<first 1-2 sentences of the tweet>"
}},
...
]
"""
def build_fallback_analyses(top_tweets: list[dict]) -> list[dict]:
analyses = []
for tweet in top_tweets[:3]:
text = tweet.get("text", "")
hook_type = "question" if "?" in text else "announcement" if any(word in text.lower() for word in ["launch", "release", "ship", "new", "out"]) else "other"
analyses.append({
"hook_type": hook_type,
"format": "single_tweet",
"emotional_trigger": "curiosity",
"why_it_performed": "Dry-run fallback analysis: the post likely performed because it matched audience interest and current conversation timing.",
"audience_fit_score": 7,
"key_elements": [tweet.get("_monitor_category", "unknown") or "unknown", "dry-run fallback"],
})
return analyses
def build_fallback_content_ideas(top_tweets: list[dict]) -> list[dict]:
categories = [t.get("_monitor_category", "AI") or "AI" for t in top_tweets[:3]] or ["AI"]
ideas = []
for index in range(3):
category = categories[index % len(categories)]
ideas.append({
"title": f"What builders keep missing about {category}",
"format": "single_tweet",
"hook_type": "data",
"angle": f"Use the current {category} conversation as a dry-run placeholder idea for Desearch positioning.",
"example_opener": f"Watching the latest {category} chatter, one pattern is obvious: teams still waste time stitching together search and crawl workflows.",
})
return ideas
def _username(tweet: dict) -> str:
u = tweet.get("user")
if isinstance(u, dict):
return u.get("username", "?")
return "?"
def _strip_markdown_fence(raw: str) -> str:
raw = raw.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
return raw.strip()
def analyse_tweet_with_llm(client, tweet: dict, model: str) -> dict:
username = _username(tweet)
prompt = ANALYSIS_USER_TEMPLATE.format(
username=username,
text=tweet.get("text", "")[:500],
likes=tweet.get("like_count", 0) or 0,
rts=tweet.get("retweet_count", 0) or 0,
replies=tweet.get("reply_count", 0) or 0,
views=tweet.get("view_count", 0) or 0,
quotes=tweet.get("quote_count", 0) or 0,
bookmarks=tweet.get("bookmark_count", 0) or 0,
category=tweet.get("_monitor_category", "unknown"),
score=tweet.get("_score", 0),
)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": ANALYSIS_SYSTEM},
{"role": "user", "content": prompt},
],
temperature=0.3,
max_tokens=400,
)
raw = response.choices[0].message.content or ""
return json.loads(_strip_markdown_fence(raw))
def generate_content_ideas(client, top_tweets: list[dict], analyses: list[dict], model: str) -> list[dict]:
patterns = []
for tweet, analysis in zip(top_tweets[:3], analyses):
username = _username(tweet)
patterns.append(
f"- @{username}: hook={analysis.get('hook_type', '?')}, "
f"trigger={analysis.get('emotional_trigger', '?')}, "
f"format={analysis.get('format', '?')}, "
f"score={tweet.get('_score', 0)}, "
f"text_snippet=\"{tweet.get('text', '')[:120]}...\""
)
patterns_summary = "\n".join(patterns) if patterns else "No pattern data available."
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": CONTENT_IDEAS_SYSTEM},
{"role": "user", "content": CONTENT_IDEAS_USER_TEMPLATE.format(patterns_summary=patterns_summary)},
],
temperature=0.7,
max_tokens=700,
)
raw = response.choices[0].message.content or ""
return json.loads(_strip_markdown_fence(raw))
# ─────────────────────────────────────────────
# Discord Formatting
# ─────────────────────────────────────────────
def _fmt_num(n: int | float | None) -> str:
"""Format large numbers compactly: 4200 → 4.2K"""
if n is None:
return "0"
n = int(n)
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(n)
def _truncate(text: str, max_len: int = 120) -> str:
text = text.replace("\n", " ").strip()
return text if len(text) <= max_len else text[: max_len - 1] + "…"
def build_discord_messages(
top_10: list[dict],
analyses: list[dict],
content_ideas: list[dict],
now_str: str,
accounts: list[dict] | None = None,
) -> list[dict]:
"""
Returns list of Discord API message payloads (content strings).
Split into multiple messages to stay under Discord's 2000-char limit.
accounts: list of account dicts from config (used to show per-account action labels).
"""
messages = []
# ── Message 1: Header + Top 10 list ──────────────────────────────────
header_lines = [
f"📊 **Engagement Report** | Top 10 posts • {now_str}",
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
]
for i, t in enumerate(top_10, 1):
uname = _username(t)
score = t.get("_score", 0)
likes = _fmt_num(t.get("like_count", 0))
rts = _fmt_num(t.get("retweet_count", 0))
views = _fmt_num(t.get("view_count", 0))
text_snip = _truncate(t.get("text", ""), 80)
header_lines.append(
f"**#{i}** @{uname} · score **{score:.0f}** · ❤️{likes} 🔄{rts} 👁️{views}"
)
header_lines.append(f"> {text_snip}")
header_lines.append("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
messages.append({"content": "\n".join(header_lines)})
# ── Message 2: Deep Dive header ───────────────────────────────────────
messages.append({"content": "🔍 **Top 3 Deep Dive**"})
# ── Messages 3-5: Top 3 cards ─────────────────────────────────────────
hook_emoji = {
"question": "❓", "data": "📊", "story": "📖",
"controversy": "🔥", "announcement": "📣", "list": "📋",
}
trigger_emoji = {
"FOMO": "😰", "curiosity": "🤔", "identity": "🪞",
"social_proof": "👥", "humor": "😂", "inspiration": "✨", "fear": "😱",
}
for i, (tweet, analysis) in enumerate(zip(top_10[:3], analyses), 1):
uname = _username(tweet)
url = tweet.get("url", "")
score = tweet.get("_score", 0)
likes = tweet.get("like_count", 0) or 0
rts = tweet.get("retweet_count", 0) or 0
replies = tweet.get("reply_count", 0) or 0
views = tweet.get("view_count", 0) or 0
quotes = tweet.get("quote_count", 0) or 0
bookmarks = tweet.get("bookmark_count", 0) or 0
hook = analysis.get("hook_type", "?")
trigger = analysis.get("emotional_trigger", "?")
fmt = analysis.get("format", "?")
why = analysis.get("why_it_performed", "")
fit = analysis.get("audience_fit_score", "?")
elements = analysis.get("key_elements", [])
elements_str = " · ".join(elements[:3]) if elements else ""
card = [
f"**#{i} @{uname}** · Score **{score:.0f}**",
f"```{_truncate(tweet.get('text', ''), 200)}```",
f"❤️ {_fmt_num(likes)} 🔄 {_fmt_num(rts)} 💬 {replies} "
f"👁️ {_fmt_num(views)} 📝 {quotes} 🔖 {bookmarks}",
f"{hook_emoji.get(hook, '📌')} Hook: **{hook}** "
f"{trigger_emoji.get(trigger, '💡')} Trigger: **{trigger}** 📐 Format: **{fmt}**",
f"🎯 Audience Fit: **{fit}/10**",
f"💡 _{why}_",
]
if elements_str:
card.append(f"🔑 Key elements: {elements_str}")
if url:
card.append(f"🔗 {url}")
# Per-account action labels
if accounts:
action_parts = []
for acct in accounts:
label = acct.get("label", acct["id"])
action_parts.append(f"as **{label}**")
card.append(
f"\n**Actions:** 🔄 RT / 💬 Quote — {' | '.join(action_parts)}\n"
f"_(set action in `pending_actions.json`)_"
)
else:
card.append(
f"\n**Actions:** 🔄 RT | 💬 Quote | ⏭️ Skip\n"
f"_(set action in `pending_actions.json`)_"
)
messages.append({"content": "\n".join(card)})
# ── Message 6: Content Ideas ──────────────────────────────────────────
ideas_lines = ["━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", "💡 **Content Ideas for @desearch_ai**"]
for j, idea in enumerate(content_ideas, 1):
title = idea.get("title", "")
angle = idea.get("angle", "")
opener = idea.get("example_opener", "")
fmt = idea.get("format", "")
ideas_lines.append(
f"\n**{j}. {title}** _{fmt}_\n"
f"> {angle}\n"
f"_Opener:_ \"{opener}\""
)
ideas_lines.append("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
messages.append({"content": "\n".join(ideas_lines)})
return messages
# ─────────────────────────────────────────────
# Discord API
# ─────────────────────────────────────────────
def post_to_discord(channel_id: str, bot_token: str, messages: list[dict]) -> None:
url = f"https://discord.com/api/v10/channels/{channel_id}/messages"
headers = {
"Authorization": f"Bot {bot_token}",
"Content-Type": "application/json",
}
for msg in messages:
content = msg["content"]
if len(content) > 2000:
content = content[:1997] + "…"
resp = requests.post(url, headers=headers, json={"content": content})
if not resp.ok:
print(f"[Discord] Failed: {resp.status_code} {resp.text[:200]}", file=sys.stderr)
else:
print(f"[Discord] Posted message (len={len(content)})", file=sys.stderr)
# ─────────────────────────────────────────────
# Pending Actions
# ─────────────────────────────────────────────
def write_pending_actions(items: list[dict], output_path: str) -> None:
"""
Write queue items to pending_actions.json for the X Action Executor.
Merges with existing entries, deduplicating by (tweet_id, account_id) while preserving review state.
"""
path = Path(output_path).expanduser()
if not path.is_absolute():
path = Path(__file__).parent / path
lock_handle = acquire_queue_lock(path)
try:
existing: list[dict] = []
if path.exists():
try:
existing = json.loads(path.read_text())
except Exception:
existing = []
existing_map = {(a.get("tweet_id", ""), a.get("account_id", "")): a for a in existing}
merged: list[dict] = []
refreshed = 0
new_count = 0
for item in items:
key = (item.get("tweet_id", ""), item.get("account_id", ""))
prior = existing_map.pop(key, None)
if prior is None:
merged.append(item)
new_count += 1
continue
preserved = {
key_name: prior[key_name]
for key_name in (
"action", "status", "quote_text", "approved_at", "reviewed_at", "review_notes",
"execution_started_at", "executed_at", "failed_at", "error"
)
if key_name in prior
}
merged.append({**item, **preserved})
refreshed += 1
merged.extend(existing_map.values())
atomic_write_json(path, merged)
print(f"[pending_actions] Written {new_count} new entries, refreshed {refreshed}, total {len(merged)} → {path}", file=sys.stderr)
finally:
lock_handle.close()
# ─────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────
def run(dry_run: bool = False, skip_llm: bool = False) -> dict[str, Any]:
cfg = load_config()
# Load all configured X accounts
accounts = get_accounts(cfg)
if not accounts:
print("[warn] No x_accounts configured in config.json", file=sys.stderr)
else:
labels = ", ".join(a.get("label", a["id"]) for a in accounts)
print(f"[analyze] Accounts: {labels}", file=sys.stderr)
# Load tweets window
window_path = Path(cfg["x_monitor_window_path"])
if not window_path.exists():
print(f"[warn] tweets_window.json not found at {window_path}, using empty list", file=sys.stderr)
tweets: list[dict] = []
else:
tweets = json.loads(window_path.read_text())
print(f"[analyze] Loaded {len(tweets)} tweets from window", file=sys.stderr)
weights = cfg.get("score_weights", {
"likes": 3, "retweets": 5, "replies": 2,
"views": 0.01, "quotes": 4, "bookmarks": 2,
})
top_n = cfg.get("top_n", 10)
top_deep = cfg.get("top_deep_dive", 3)
model = cfg.get("openai_model", "gpt-4o-mini")
# Score & rank
top_10 = get_top_tweets(tweets, weights, top_n)
print(f"[analyze] Top {len(top_10)} tweets selected", file=sys.stderr)
# LLM: only top-3 get deep-dive (cost-efficient — not all 10)
top_for_deep = top_10[:top_deep]
skip_llm = skip_llm or os.environ.get("X_ENGAGE_SKIP_LLM", "").lower() in {"1", "true", "yes"}
if skip_llm:
print("[llm] Skipping remote LLM calls, using dry-run fallback analysis", file=sys.stderr)
analyses = build_fallback_analyses(top_for_deep)
content_ideas = build_fallback_content_ideas(top_10)
else:
openai_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_key:
raise RuntimeError("OPENAI_API_KEY not set in environment")
from openai import OpenAI
client = OpenAI(api_key=openai_key)
analyses: list[dict] = []
for i, tweet in enumerate(top_for_deep, 1):
uname = _username(tweet)
print(f"[llm] Analysing tweet #{i} by @{uname} (score={tweet.get('_score', 0):.1f})", file=sys.stderr)
analysis = analyse_tweet_with_llm(client, tweet, model)
analyses.append(analysis)
# Generate 3 content ideas from detected top-performer patterns
print("[llm] Generating content ideas…", file=sys.stderr)
content_ideas = generate_content_ideas(client, top_10, analyses, model)
# Build result payload
result: dict[str, Any] = {
"top_10": [
{
"rank": i + 1,
"id": t.get("id"),
"url": t.get("url"),
"author": _username(t),
"text": t.get("text", "")[:280],
"score": t.get("_score"),
"like_count": t.get("like_count", 0) or 0,
"retweet_count": t.get("retweet_count", 0) or 0,
"reply_count": t.get("reply_count", 0) or 0,
"view_count": t.get("view_count", 0) or 0,
"quote_count": t.get("quote_count", 0) or 0,
"bookmark_count": t.get("bookmark_count", 0) or 0,
"category": t.get("_monitor_category", ""),
}
for i, t in enumerate(top_10)
],
"analyses": analyses,
"content_ideas": content_ideas,
"accounts": [{"id": a["id"], "label": a.get("label", a["id"])} for a in accounts],
"generated_at": datetime.now(timezone.utc).isoformat(),
"tweet_count_in_window": len(tweets),
"llm_mode": "fallback" if skip_llm else "live",
}
if dry_run:
# Stdout = pure JSON; all logs were sent to stderr
print(json.dumps(result, indent=2, ensure_ascii=False))
return result
# Write pending actions for human review — one item per tweet × account
pending_path = cfg.get("pending_actions_path", "pending_actions.json")
queue_items = build_queue_items(top_for_deep, accounts)
write_pending_actions(queue_items, pending_path)
# Post digest to Discord
bot_token = _get_discord_token()
channel_id = str(cfg["discord_channel_id"])
if not bot_token:
raise RuntimeError(
"DISCORD_BOT_TOKEN not set. Set it in .env, environment, or ~/.openclaw/openclaw.json"
)
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
discord_msgs = build_discord_messages(
top_10, analyses, content_ideas, now_str, accounts=accounts
)
post_to_discord(channel_id, bot_token, discord_msgs)
print(f"[done] Engagement report posted to Discord #{channel_id}", file=sys.stderr)
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="x-engage: Engagement Analyzer + Discord Reporter")
parser.add_argument(
"--dry-run",
action="store_true",
help="Print analysis JSON to stdout, skip Discord post and pending_actions write",
)
parser.add_argument(
"--skip-llm",
action="store_true",
help="Use local fallback analysis/content ideas instead of remote LLM calls",
)
args = parser.parse_args()
try:
run(dry_run=args.dry_run, skip_llm=args.skip_llm)
sys.exit(0)
except Exception as e:
print(f"[error] {e}", file=sys.stderr)
sys.exit(1)