jms0522 / hadoop_system

βœ… hadoop eco system을 κ΅¬μ„±ν•˜κ³  νŒŒμ΄ν”„λΌμΈ μ œμž‘ν•©λ‹ˆλ‹€.
2 stars 0 forks source link

πŸ“Œ ν† ν”½ 생성 #5

Open jms0522 opened 2 months ago

jms0522 commented 2 months ago

logstash conf μ—μ„œ kafka topic을 μ„€μ •

μ»¨ν…Œμ΄λ„ˆ λ‚΄λΆ€ kafka-topic.sh μžˆλŠ” κ³³μ—μ„œ μ‹€ν–‰ν•˜κ±°λ‚˜ docker exec -itλ₯Ό μ‚¬μš©ν•΄μ„œ μ™ΈλΆ€μ—μ„œ μ‹€ν–‰

ν† ν”½ λͺ©λ‘ 확인

 kafka-topics.sh --list --bootstrap-server localhost:9092 

ν† ν”½ 생성

kafka-topics.sh --create \
  --topic <토픽이름> \
  --partitions <νŒŒν‹°μ…˜μˆ˜> \
  --replication-factor <볡제본수> \
  --bootstrap-server localhost:9092

정보 확인

kafka-topics.sh --describe \
  --topic <토픽이름> \
  --bootstrap-server localhost:9092

데이터 λ“€μ–΄μ˜€λŠ”μ§€ 확인

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic new_test_topic --from-beginning

Python μ½”λ“œλ‘œ ν† ν”½ 생성 및 λͺ©λ‘ 확인 (kakfa-python)

μ„€μΉ˜

 pip install kafka-python

 from kafka.admin import KafkaAdminClient, NewTopic
 from kafka import KafkaConsumer

  # Kafka ν΄λΌμ΄μ–ΈνŠΈ μ„€μ •
  admin_client = KafkaAdminClient(
      bootstrap_servers="localhost:9092",
      client_id='kafka-python-example'
  )

# ν† ν”½ 생성 ν•¨μˆ˜
  def create_topic(topic_name, num_partitions=3, replication_factor=1):
      topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
      admin_client.create_topics(new_topics=[topic], validate_only=False)
      print(f"Topic '{topic_name}' created successfully.")

# ν† ν”½ λͺ©λ‘ 확인 ν•¨μˆ˜
  def list_topics():
      consumer = KafkaConsumer(bootstrap_servers="localhost:9092")
      topics = consumer.topics()
      print("Topics:", topics)

# ν† ν”½ 생성
  create_topic("logstash-topic")

# ν† ν”½ λͺ©λ‘ 확인
    list_topics()

docker-composeλ₯Ό 톡해 μ‹œμž‘μ‹œ ν† ν”½ 생성도 κ°€λŠ₯ν•˜λ‹€.

environment:
  KAFKA_CREATE_TOPICS: "logstash-topic:3:1"  # ν† ν”½ 생성: logstash-topic, νŒŒν‹°μ…˜ 3, 볡제본 1

μ΄λŸ°μ‹μœΌλ‘œ μ‹œμž‘μ‹œ 생성할 수 있음.

슀크립트λ₯Ό ν†΅ν•΄μ„œ 생성도 κ°€λŠ₯ν•˜λ‹€.

command: >
  sh -c "
  /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
  sleep 5;
  /opt/kafka/bin/kafka-topics.sh --create --topic logstash-topic --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092;
  /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092;
  wait
  "

μ΄λŸ°μ‹μœΌλ‘œ 정해두면 슀크립트둜 μ‹€ν–‰λœλ‹€. 단 shνŒŒμΌμ€ 미리 μ‘΄μž¬ν•΄μ•Όν•¨.

jms0522 commented 2 months ago

μ„€μ • 파일 μœ„μΉ˜

/opt/kafka_2.13-2.8.1/config/server.properties