Open K0nkere opened 11 months ago
from datetime import datetime, timedelta
import logging
import json
import boto3
config = dict()
config['api_id'] = <integer_api_id>
config['api_hash'] = '<api_hash>'
config['chats'] = ['chats_to_parse>',]
config['path'] = '<path_to_.session_file>'
config['limit'] = 100 # num of messages to parse
names = ['str', 'dict', 'data', 'reactions', 'hashtags', 'links', 'polls']
logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s',
level=logging.WARNING)
def _get_telegram_messages(config):
import ast
from telethon.sync import TelegramClient
import os
from telethon import functions
import json
import datetime
def _replace_chars(text):
text = text.replace("\n", '')
text = text.replace("\t", '')
text = text.replace("\r", '')
if len(text) == 0:
text = "-"
return text
def _parse_message(message, char_stat, date_parsing):
m_id = message.id
m_chat = message.chat.id
chat_title = message.chat.title
try:
text = _replace_chars(message.text)
except:
text = ''
date = str(message.date)
views = message.views if message.views is not None else 0
forwards = message.forwards if message.forwards is not None else 0
try:
replies = message.replies.replies
except:
replies = 0
if message.reactions is not None:
reactions = [{"date_parsing": date_parsing, "source": "telegram", "message_id": m_id, "date": date, "group_title": chat_title,
"reaction": reaction.reaction.emoticon, "count": reaction.count} for reaction in message.reactions.results]
else:
reactions = 0
audio = 1 if message.audio is not None else 0
if message.media is not None:
try:
# если опрос то берем данные по попросам
poll_id = message.media.poll.id
poll_question = _replace_chars(message.media.poll.question)
poll = [{"date_parsing": date_parsing, "source": "telegram", "message_id": m_id, "date": date, "group_title": chat_title,
"poll_id": poll_id, "question": poll_question, "answer": _replace_chars(answer.text), "result": result.voters}
for (answer, result) in zip(message.media.poll.answers, message.media.results.results)]
except:
poll = 0
try:
message.media.document.mime_type
video = 1
except:
video = 0
try:
message.media.photo.id
photo = 1
except:
photo = 0
else:
photo = 0
video = 0
poll = 0
try:
entities = message.get_entities_text()
if entities.__len__() > 0:
hashtags = [{"date_parsing": date_parsing, "source": "telegram", "message_id": m_id, "date": date, "group_title": chat_title,
"hashtag": entity[1]} for entity in entities if
str(type(entity[0])).split('.')[-1][:-2] == "MessageEntityHashtag"]
links = [{"date_parsing": date_parsing, "source": "telegram", "message_id": m_id, "date": date, "group_title": chat_title,
"url": entity[1]} for entity in entities if
str(type(entity[0])).split('.')[-1][:-2] == "MessageEntityUrl"]
else:
hashtags = 0
links = 0
except:
hashtags = 0
links = 0
message_data = {
"date_parsing": date_parsing,
"source": "telegram",
"message_id": m_id,
"source_id": m_chat,
"source_name": chat_title,
"text": text,
"video": video,
"photo": photo,
"audio": audio,
"date": date,
"poll": 1 if poll != 0 else 0,
"hashtags": len(hashtags) if hashtags != 0 else 0,
"links": len(links) if links != 0 else 0,
"reactions": 1 if poll != 0 else 0,
"replies": replies,
"views": views,
"forwards": forwards,
"char_stat": char_stat
}
return message_data, reactions, hashtags, links, poll
config = ast.literal_eval(config)
api_id = config['api_id']
api_hash = config['api_hash']
chats = config['chats']
path = config['path']
limit = config['limit']
os.chdir(path) # moving to directory with .session file
client = TelegramClient('<phone_number_to_session_login>', api_id, api_hash)
client.start()
messages_str = []
messages_dict = []
messages_data = []
messages_reactions = []
messages_hashtags = []
messages_links = []
messages_polls = []
for chat in chats:
char_stat = client(functions.channels.GetFullChannelRequest(channel=chat))
char_stat = json.loads(char_stat.to_json())['full_chat']['participants_count']
# print(char_stat)
date = str(datetime.datetime.now())
for message in client.iter_messages(chat, limit=limit):
messages_str += [{"date":date, "str": message.stringify()}]
messages_dict += [{"date": date, "dict": message.__dict__}]
message_data, reactions, hashtags, links, poll = _parse_message(message, char_stat, date)
messages_data += [message_data]
if reactions != 0:
[messages_reactions.append(reaction) for reaction in reactions]
if hashtags != 0:
[messages_hashtags.append(hashtag) for hashtag in hashtags]
if links != 0:
[messages_links.append(link) for link in links]
if poll != 0:
[messages_polls.append(pol) for pol in poll]
messages_polls = [poll for poll in messages_polls if len(poll) > 0]
messages_reactions = [reactions for reactions in messages_reactions if len(reactions) > 0]
messages_hashtags = [hashtags for hashtags in messages_hashtags if len(hashtags) > 0]
messages_links = [links for links in messages_links if len(links) > 0]
client.disconnect()
return json.dumps({
"messages_str": messages_str,
"messages_dict": messages_dict,
"messages_data": messages_data,
"messages_reactions": messages_reactions,
"messages_hashtags": messages_hashtags,
"messages_links": messages_links,
"messages_polls": messages_polls,
}, default=str)
# upload to s3
def _s3_loader(data, name, creds):
import boto3
import datetime
import json
import io
import csv
def dict2csv(data):
stream = io.StringIO()
headers = list(data[0].keys())
writer = csv.DictWriter(stream, fieldnames=headers)
writer.writeheader()
writer.writerows(data)
return stream.getvalue()
session = boto3.session.Session()
s3 = session.client(
service_name='s3',
endpoint_url=creds['endpoint_url'],
aws_access_key_id=creds['aws_access_key_id'],
aws_secret_access_key=creds['aws_secret_access_key'],
region_name=creds['region_name']
)
data = json.loads(data).get(f'messages_{name}')
if len(data) == 0:
print(f'Have no new records on messages_{name}')
return
BUCKET = creds['bucket']
s3.put_object(Bucket=BUCKET, Key=f'telegram_{name}_{datetime.datetime.now()}.csv', Body=dict2csv(data))
print(f'Files on messages_{name} loaded successfully')
return
# upload to Clickhouse
def upload_to_clickhouse(data, name):
import pandas as pd
from clickhouse_connect import create_client
connect_insert = create_client(
host='<clickhouse_host>',
port='<clickhouse_port>',
username='<clickhose_user>',
password='<clickhouse_ow>',
)
selected_data = json.loads(data).get(f'messages_{name}')
data_df = pd.DataFrame.from_records(selected_data)
if len(data_df) > 0:
table = 'telegram' if name == 'data' else f'telegram_{name}'
insert_context = connect_insert.create_insert_context(table=table, column_names=data_df.columns, database='media')
connect_insert.insert_df(df=data_df, context=insert_context)
return
if __name__== '__main__':
print('getting messages from telegram')
data = _get_telegram_messages(json.dumps(config))
for name in names:
print(f'processing {name}')
# _s3_loader(data, name=name, creds=creds)
# upload_to_clickhouse(data, name=name)
Аутентификация, создание session-файла