from kafka import KafkaConsumer
import json
import os
def get_movie_data(movie_name):
json_file_path = '/home/esthercho/code/mdata/data/movies/year=2015/data.json'
if os.path.exists(json_file_path):
with open(json_file_path, 'r', encoding='utf-8') as file:
movies = json.load(file)
for movie in movies:
if movie['movieNm'] == movie_name:
return movie
return None
def consume_messages(topic='chatbot_topic'):
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='chatbot-group',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
data = message.value
if 'message' in data:
user_input = data['message']
if user_input.startswith('@챗봇'):
movie_name = user_input.split(':', 1)[-1].strip()
movie_data = get_movie_data(movie_name)
if movie_data:
print(f"영화 '{movie_name}'의 JSON 데이터:\n{json.dumps(movie_data, ensure_ascii=False, indent=4)}")
else:
print(f"영화 '{movie_name}'를 찾을 수 없습니다.")
if __name__ == "__main__":
consume_messages()
By ChatGPT
Producer.py
consumer.py