-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Description
import os
import json
from telethon import TelegramClient, events, types
from telethon.errors import FloodWaitError, MessageIdInvalidError
import os
from dotenv import load_dotenv
load_dotenv()
api_id = int(os.getenv('API_ID'))
api_hash = os.getenv('API_HASH')
source_chat = int(os.getenv('SOURCE_CHAT'))
target_chat = int(os.getenv('TARGET_CHAT'))
proxy = (os.getenv('PROXY_TYPE'), os.getenv('PROXY_HOST'), int(os.getenv('PROXY_PORT')))
session_file = os.getenv('SESSION_FILE', 'filter_bot')
progress_file = os.getenv('PROGRESS_FILE', 'progress.json')
BASE_DELAY = 2.5
BATCH_SIZE = 100
MAX_RETRIES = 5
# 伪装设备信息为 https://t.me/kelidsb
client = TelegramClient(
session_file,
api_id,
api_hash,
device_model="Samsung Galaxy S22 Ultra",
system_version="Android 13 (SDK 33)",
app_version="Telegram Android 9.7.2",
system_lang_code="en",
lang_code="en",
connection_retries=5
)
class ProgressManager:
def __init__(self, file_path):
self.file_path = file_path
self.data = {
'min_id': 0, # 最早消息ID
'max_id': 0, # 最新消息ID
'current_id': 0, # 当前处理到的消息ID
'processed': 0, # 已处理消息计数
'total': 0, # 总消息计数
'running': False # 运行状态标志
}
def load(self):
if os.path.exists(self.file_path):
try:
with open(self.file_path, 'r') as f:
self.data = json.load(f)
except Exception:
pass
return self.data
def save(self):
try:
with open(self.file_path, 'w') as f:
json.dump(self.data, f)
except Exception:
pass
def set_min_id(self, min_id):
if min_id > 0 and (self.data['min_id'] == 0 or min_id < self.data['min_id']):
self.data['min_id'] = min_id
self.save()
def set_max_id(self, max_id):
if max_id > self.data['max_id']:
self.data['max_id'] = max_id
self.save()
def set_current_id(self, current_id):
if current_id > self.data['current_id']:
self.data['current_id'] = current_id
self.save()
def increment_processed(self, count=1):
self.data['processed'] += count
self.save()
def set_total(self, total):
self.data['total'] = total
self.save()
def set_running(self, running):
self.data['running'] = running
self.save()
progress = ProgressManager(progress_file)
async def send_message_without_reply(message):
text = message.text
media = message.media
entities = message.entities if message.text else None
retries = 0
while retries < MAX_RETRIES:
try:
if media:
if isinstance(media, types.MessageMediaWebPage):
await client.send_message(
target_chat,
text,
link_preview=True,
parse_mode='html',
background=True
)
elif isinstance(message, types.Message) and message.grouped_id:
return False
else:
await client.send_file(
target_chat,
media,
caption=text,
parse_mode='html',
background=True
)
else:
await client.send_message(
target_chat,
text,
parse_mode='html',
background=True
)
return True
except FloodWaitError as e:
print(f"⚠️ 速率限制,等待 {e.seconds} 秒...")
await asyncio.sleep(e.seconds)
except MessageIdInvalidError:
print("⚠️ 消息ID无效,跳过...")
return False
except Exception as e:
print(f"⚠️ 发送错误: {str(e)}")
retries += 1
await asyncio.sleep(1)
return False
async def send_album(messages):
media = []
captions = []
for msg in messages:
text = msg.text or ''
media.append(msg.media)
captions.append(text)
retries = 0
while retries < MAX_RETRIES:
try:
await client.send_file(
target_chat,
media,
captions=captions,
parse_mode='html',
background=True
)
return True
except FloodWaitError as e:
print(f"⚠️ 速率限制,等待 {e.seconds} 秒...")
await asyncio.sleep(e.seconds)
except MessageIdInvalidError:
print("⚠️ 消息ID无效,跳过...")
return False
except Exception as e:
print(f"⚠️ 发送相册错误: {str(e)}")
retries += 1
await asyncio.sleep(1)
return False
async def process_history():
progress_data = progress.load()
print(f"📊 恢复进度: 当前ID={progress_data['current_id']}, 已处理={progress_data['processed']}/{progress_data['total']}")
# 获取总消息数
if progress_data['min_id'] == 0:
first_msg = await client.get_messages(source_chat, limit=1, reverse=True)
if first_msg:
min_id = first_msg[0].id
progress.set_min_id(min_id)
print(f"📌 最早消息ID: {min_id}")
# 获取最新消息ID
last_msg = await client.get_messages(source_chat, limit=1)
if last_msg:
max_id = last_msg[0].id
progress.set_max_id(max_id)
print(f"📌 最新消息ID: {max_id}")
# 如果还没有总消息数,估算总数
if progress_data['total'] == 0:
total_estimate = max_id - min_id + 1
progress.set_total(total_estimate)
print(f"📌 估计总消息数: {total_estimate}")
# 从最早消息开始处理
current_id = progress_data['current_id'] if progress_data['current_id'] > 0 else progress_data['min_id']
max_id = progress_data['max_id']
total_processed = progress_data['processed']
total_messages = progress_data['total']
# 进度显示函数
def print_progress():
percent = (total_processed / total_messages) * 100 if total_messages > 0 else 0
print(f"📦 进度: {total_processed}/{total_messages} ({percent:.2f}%) | 当前ID: {current_id}")
# 设置运行状态
progress.set_running(True)
# 分批处理历史消息
while current_id <= max_id:
# 获取下一批消息
try:
messages = await client.get_messages(
source_chat,
min_id=current_id,
max_id=min(current_id + BATCH_SIZE - 1, max_id),
reverse=False # 从旧到新
)
except Exception as e:
print(f"⚠️ 获取消息错误: {str(e)}")
await asyncio.sleep(5)
continue
if not messages:
# 如果没有消息,尝试增加ID范围
current_id += BATCH_SIZE
progress.set_current_id(current_id)
await asyncio.sleep(0.1)
continue
# 处理消息批次
album_group = {}
for msg in messages:
if msg.grouped_id:
group_id = msg.grouped_id
if group_id not in album_group:
album_group[group_id] = []
album_group[group_id].append(msg)
else:
if album_group:
# 先处理现有的媒体组
for group_id, group_msgs in album_group.items():
group_msgs.sort(key=lambda x: x.id)
if await send_album(group_msgs):
total_processed += len(group_msgs)
progress.increment_processed(len(group_msgs))
print_progress()
await asyncio.sleep(BASE_DELAY)
album_group = {}
# 处理当前消息
if await send_message_without_reply(msg):
total_processed += 1
progress.increment_processed()
print_progress()
await asyncio.sleep(BASE_DELAY)
# 处理剩余的媒体组
if album_group:
for group_id, group_msgs in album_group.items():
group_msgs.sort(key=lambda x: x.id)
if await send_album(group_msgs):
total_processed += len(group_msgs)
progress.increment_processed(len(group_msgs))
print_progress()
await asyncio.sleep(BASE_DELAY)
# 更新当前ID
current_id = messages[-1].id + 1
progress.set_current_id(current_id)
# 控制速率
await asyncio.sleep(BASE_DELAY)
# 完成历史消息处理
progress.set_running(False)
print("✅ 历史消息处理完成")
@client.on(events.NewMessage(chats=source_chat))
async def realtime_handler(event):
# 如果历史消息还在处理中,跳过新消息
if progress.data.get('running', False):
return
msg = event.message
if msg.grouped_id:
await asyncio.sleep(0.5)
group = await client.get_messages(
source_chat,
ids=await client.get_messages(source_chat, search=msg.grouped_id)
)
if group:
group.sort(key=lambda x: x.id)
if await send_album(group):
# 更新最新消息ID
progress.set_max_id(group[-1].id)
else:
if await send_message_without_reply(msg):
# 更新最新消息ID
progress.set_max_id(msg.id)
async def main():
await client.start()
# 在启动时添加加入群组提示
print("\n" + "="*50)
print("频道复制器已启动!")
print("加入群组 @kelidsb 查看更多实用项目")
print("="*50 + "\n")
print("🔄 开始处理历史消息...")
await process_history()
print("👂 开始监听新消息...")
await client.run_until_disconnected()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n🛑 用户中断,正在保存进度...")
progress.save()
finally:
progress.set_running(False) ````
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels