Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 167 additions & 69 deletions snscrape/modules/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

_logger = logging.getLogger(__name__)
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')

_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)')

@dataclasses.dataclass
class LinkPreview:
Expand All @@ -24,29 +24,12 @@ class LinkPreview:
image: typing.Optional[str] = None


@dataclasses.dataclass
class TelegramPost(snscrape.base.Item):
url: str
date: datetime.datetime
content: str
outlinks: list
images: list
videos: list
forwarded: str
linkPreview: typing.Optional[LinkPreview] = None

outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')

def __str__(self):
return self.url


@dataclasses.dataclass
class Channel(snscrape.base.Entity):
username: str
title: str
verified: bool
photo: str
title: typing.Optional[str] = None
verified: typing.Optional[bool] = None
photo: typing.Optional[str] = None
description: typing.Optional[str] = None
members: typing.Optional[int] = None
photos: typing.Optional[snscrape.base.IntWithGranularity] = None
Expand All @@ -63,6 +46,55 @@ def __str__(self):
return f'https://t.me/s/{self.username}'


@dataclasses.dataclass
class TelegramPost(snscrape.base.Item):
url: str
date: datetime.datetime
content: str
outlinks: typing.List[str] = None
mentions: typing.List[str] = None
hashtags: typing.List[str] = None
forwarded: typing.Optional['Channel'] = None
forwardedUrl: typing.Optional[str] = None
media: typing.Optional[typing.List['Medium']] = None
views: typing.Optional[int] = None
linkPreview: typing.Optional[LinkPreview] = None

outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')

def __str__(self):
return self.url


class Medium:
pass


@dataclasses.dataclass
class Photo(Medium):
url: str


@dataclasses.dataclass
class Video(Medium):
thumbnailUrl: str
duration: float
url: typing.Optional[str] = None


@dataclasses.dataclass
class VoiceMessage(Medium):
url: str
duration: str
bars:typing.List[float]


@dataclasses.dataclass
class Gif(Medium):
thumbnailUrl: str
url: typing.Optional[str] = None


class TelegramChannelScraper(snscrape.base.Scraper):
name = 'telegram-channel'

Expand Down Expand Up @@ -93,48 +125,85 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
url = rawUrl.replace('//t.me/', '//t.me/s/')
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
images = []
videos = []
media = []
outlinks = []
mentions = []
hashtags = []
forwarded = None
forwardedUrl = None

if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
forwardedUrl = forwardTag['href']
forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
forwarded = Channel(username = forwardedName)

if (message := post.find('div', class_ = 'tgme_widget_message_text')):
content = message.get_text(separator="\n")
else:
content = None

for video_tag in post.find_all('video'):
videos.append(video_tag['src'])
for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name)
continue
if link['href'] == rawUrl or link['href'] == url:
style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '':
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
if link.text.startswith('@'):
mentions.append(link.text.strip('@'))
continue
if link.text.startswith('#'):
hashtags.append(link.text.strip('#'))
continue
href = urllib.parse.urljoin(pageUrl, link['href'])
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
outlinks.append(href)

if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0]
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]

media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))

for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = durationStrToSeconds(durationStr)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment on list vs str as above for durationStrToSeconds

media.append(cls(**mKwargs))

outlinks = []
for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name)
continue
if link['href'] == rawUrl or link['href'] == url:
style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '':
image = re.findall('url\(\'(.*?)\'\)', style)
if len(image) == 1:
images.append(image[0])
continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
image = re.findall('url\(\'(.*?)\'\)', style)
if len(image) == 1:
images.append(image[0])
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
href = urllib.parse.urljoin(pageUrl, link['href'])
if href not in outlinks:
outlinks.append(href)
else:
content = None
outlinks = []
images = []
videos = []
linkPreview = None
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
kwargs = {}
Expand All @@ -151,20 +220,40 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
else:
_logger.warning(f'Could not process link preview image on {url}')
linkPreview = LinkPreview(**kwargs)
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, videos = videos, forwarded = forwarded)
if kwargs['href'] in outlinks:
outlinks.remove(kwargs['href'])

viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
views = None if viewsSpan is None else parse_num(viewsSpan.text)

yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)

def get_items(self):
r, soup = self._initial_page()
if '/s/' not in r.url:
_logger.warning('No public post list for this user')
return
nextPageUrl = ''
while True:
yield from self._soup_to_items(soup, r.url)
try:
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
# if message 1 is the first message in the page, terminate scraping
break
except:
pass
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
break
# some pages are missing a "tme_messages_more" tag, causing early termination
if '=' not in nextPageUrl:
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
if nextPostIndex > 20:
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
else:
break
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
r = self._get(nextPageUrl, headers = self._headers)
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
Expand Down Expand Up @@ -204,15 +293,6 @@ def _get_entity(self):
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
kwargs['description'] = descriptionDiv.text

def parse_num(s):
s = s.replace(' ', '')
if s.endswith('M'):
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
elif s.endswith('K'):
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
else:
return int(s), 1

for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
type_ = div.find('span', class_ = 'counter_type').text
Expand All @@ -231,3 +311,21 @@ def _cli_setup_parser(cls, subparser):
@classmethod
def _cli_from_args(cls, args):
return cls._cli_construct(args, args.channel)

def parse_num(s):
s = s.replace(' ', '')
if s.endswith('M'):
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
elif s.endswith('K'):
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
return int(s), 1

def durationStrToSeconds(durationStr):
durationList = durationStr.split(':')
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])

def telegramResponseOkCallback(r):
if r.status_code == 200:
return (True, None)
return (False, f'{r.status_code=}')