-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgrabber.py
More file actions
85 lines (71 loc) · 3.24 KB
/
grabber.py
File metadata and controls
85 lines (71 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# -*- coding: utf-8 -*-
import re
import json
import requests
from time import sleep
from os.path import abspath
from urllib.parse import quote_plus
from telethon import TelegramClient, events
from telethon.sessions import StringSession
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Read config file
with open(abspath('config.json'), encoding='utf-8') as f:
config = json.load(f)
# Check if config file is empty
if not (config['client']['api_id'] or config['client']['api_hash']
or config['settings']['my_channels'] or config['settings']['chats']):
logging.error('Error! You missed something in the settings (config.json). Fill it out for proper operation.')
exit()
# Read prompt from prompt.txt file
with open(abspath('prompt.txt'), encoding='utf-8') as f:
edit_prompt = f.read().strip()
def has_ban_phrases(text, ban_phrases):
text_lines = text.split('\n')
text_words = ' '.join(text_lines).split(' ')
sbuf = set(text_words)
single_word_bans = [x for x in ban_phrases if ' ' not in x and x in sbuf]
multi_word_bans = [x for x in ban_phrases if ' ' in x and x in text]
result = single_word_bans + multi_word_bans
return bool(result)
def has_ban_symbols(text, ban_symbols):
if len(ban_symbols) == 0:
return False
pattern = '(?:{})'.format('|'.join(ban_symbols))
return bool(re.search(pattern, text, flags=re.I))
def send_message(message):
if config['bot']['token'] and config['bot']['all_ID']:
for user_id in config['bot']['all_ID']:
url_message = quote_plus(message)
requests.post(
url="https://api.telegram.org/bot" + str(config['bot']['token']) + "/sendMessage?chat_id=" + str(
user_id) + "&text=" + str(url_message)
).json()
logging.info(config['bot']['message'])
send_message(config['bot']['message'])
try:
client = TelegramClient(StringSession(config['client']['string_session']), config['client']['api_id'], config['client']['api_hash'])
client.start()
@client.on(events.Album(chats=config['settings']['chats']))
async def album_handler(event: events.Album.Event):
sleep(config['settings']['timer'])
for channel in config['settings']['my_channels']:
await client.send_message(channel, file=event.messages, message=event.original_update.message.message)
sleep(1)
@client.on(events.NewMessage(chats=config['settings']['chats']))
async def normal_handler(event: events.NewMessage.Event):
if event.message.media and event.grouped_id is not None:
return
has_bw = has_ban_phrases(str(event.message.message), config['settings']['ban_phrases'])
has_bs = has_ban_symbols(str(event.message.message), config['settings']['ban_symbols'])
if not has_bw and not has_bs:
sleep(config['settings']['timer'])
for channel in config['settings']['my_channels']:
await client.send_message(channel, event.message)
sleep(1)
client.run_until_disconnected()
except Exception as error:
send_message(f'An error occurred...\n\n{error}')
logging.error(f'An error occurred: {error}')