dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.61k stars 1.41k forks source link

Kafka-python do not send message from Airflow dag #2023

Closed Sabutobi closed 4 years ago

Sabutobi commented 4 years ago

Hi, all. This is the kafka-docker.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka_container
    ports:
        - "9092:9092"                               # expose port
    environment:
        KAFKA_ADVERTISED_HOST_NAME: kafka           # specify the docker host IP at which other containers can reach the broker
        KAFKA_CREATE_TOPICS: 'SomeTopic:1:1,SomeTopic2:1:1'
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181     # specify where the broker can reach Zookeeper
        KAFKA_LISTENERS: PLAINTEXT://:9092          # the list of addresses on which the Kafka broker will listen on for incoming connections.
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092  # Kafka sends the value of this variable to clients during their connection. After receiving that value, the clients use it for sending/consuming records to/from the Kafka broker.y connect to it.
    volumes:
        - /var/run/docker.sock:/var/run/docker.sock

That is the airflow container:

version: '3.0'
services:
    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
        networks:
            - network-mongodb-service_some-nw

        logging:
            options:
                max-size: 10m
                max-file: "3"

    webserver:
        build:
            context: .
        image: puckel/docker-airflow:1.10.9
        restart: always
        depends_on:
            - postgres
        environment:
            - LOAD_EX=n
            - EXECUTOR=Local
            - whatever=${somevar}
        logging:
            options:
                max-size: 10m
                max-file: "3"
        volumes:
            - ./dags:/usr/local/airflow/dags
            - ./requirements.txt:/requirements.txt
        networks:
            - network-mongodb-service_some-nw
            - kafka_default
        external_links:
            - somelink:${somelink}
            - kafka_container:kafka
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

networks:
    network-mongodb-service_some-nw:
        external: true
    kafka_default:
        external: true

How that works if just connect via docker exec -it contairerId bash?

>>> from kafka import KafkaProducer
>>> kafka_host = 'kafka:9092'
>>> producer = KafkaProducer(bootstrap_servers=kafka_host,acks='all',retries=0)
>>> producer.send(self.where_to_send,
...                              self.encoded_payload)

Works fine and no problem met. But: If I'll add identical code to the airflow dag:

Sending (key=None value=b'{"topic": "topic", "msg": "message", "domainName": "somedomain.com"}' headers=[]) to TopicPartition(topic='sometopic', partition=0)
Allocating a new 16384 byte message buffer for TopicPartition(topic='Webtraffic', partition=0)
Waking up the sender since TopicPartition(topic='Webtraffic', partition=0) is either full or getting a new batch
[2020-03-19 15:01:03,994] {{conn.py:378}} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('192.168.32.2', 9092)]>: connecting to kafka:9092 [('192.168.32.2', 9092) IPv4]
[2020-03-19 15:01:03,995] {{conn.py:1195}} INFO - Probing node bootstrap-0 broker version
[2020-03-19 15:01:03,995] {{conn.py:407}} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('192.168.32.2', 9092)]>: Connection complete.
[2020-03-19 15:01:04,102] {{conn.py:1257}} INFO - Broker version identified as 1.0.0
[2020-03-19 15:01:04,103] {{conn.py:1259}} INFO - Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
Received correlation id: 3
[2020-03-19 15:01:04,113] {{parser.py:139}} DEBUG - Received correlation id: 3
Processing response MetadataResponse_v1
[2020-03-19 15:01:04,115] {{parser.py:166}} DEBUG - Processing response MetadataResponse_v1
<BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('192.168.32.2', 9092)]> Response 3 (10.233640670776367 ms): MetadataResponse_v1(brokers=[(node_id=1001, host='kafka', port=9092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='Social', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='Webtraffic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='__consumer_offsets', is_internal=True, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001])])])
[2020-03-19 15:01:04,121] {{conn.py:1071}} DEBUG - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('192.168.32.2', 9092)]> Response 3 (10.233640670776367 ms): MetadataResponse_v1(brokers=[(node_id=1001, host='kafka', port=9092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='Social', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='Webtraffic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='__consumer_offsets', is_internal=True, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001])])])
Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 3, groups: 0)
[2020-03-19 15:01:04,132] {{cluster.py:325}} DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 3, groups: 0)

And nothing happened. Message not sent. No errors. Nothing. Maybe you'll have some fresh ideas what I've done wrong?

tvoinarovskyi commented 4 years ago

@Sabutobi Could you share us your DAG code? I suppose it's related to the fact that LocalExecutor is running the code using multiprocessing and producer can't be shared between processes. You need to make sure you instantiate the Producer inside your operator, not in the plain DAG definition file. I did not investigate further into how the executor actually loads those DAG files, so I may be wrong. Also, could you try SequentialExecutor on your code, it should not produce this behaviour if I am right.

Sabutobi commented 4 years ago

Hi @tvoinarovskyi From airflow.cfg:

executor = SequentialExecutor

The code:

from kafka import KafkaProducer
kafka_host = 'kafka:9092'
producer = KafkaProducer(bootstrap_servers=kafka_host,acks='all',retries=0)

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'max_active_runs': 1,
    'start_date': datetime(date.today().year, date.today().month, date.today().day),
}
dag = DAG(
    dag_id='some_id',
    default_args=default_args,
    concurrency=1,
    catchup=False,
    schedule_interval="7 2-22/4 * * *")

def social_daily_load():
    print("Webtraffic upload started...")
    payload = {
        'topic': 'Webtraffic',        "msg": 'load_webtraffic',        "domainName": 'cleaf.it'
        }
    where_to_send = payload['topic']
    encoded_payload = json.dumps(payload).encode('utf-8')
    producer.send(where_to_send,
                             encoded_payload)
    print('message sent')

main_load_op = PythonOperator(
    python_callable=social_daily_load, task_id="some_id", dag=dag)

The result:

webserver_1  | Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 3, groups: 0)
webserver_1  | [2020-03-20 09:25:08,810] {{cluster.py:325}} DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 3, groups: 0)
webserver_1  | Running %s on host %s <TaskInstance: hourly_webtraffic_dag.webtraffic_load 2020-03-20T02:07:00+00:00 [queued]> 09c5ff63d865
webserver_1  | Sending (key=None value=b'{"topic": "Webtraffic", "msg": "load_webtraffic", "domainName": "cleaf.it"}' headers=[]) to TopicPartition(topic='Webtraffic', partition=0)
webserver_1  | Allocating a new 16384 byte message buffer for TopicPartition(topic='Webtraffic', partition=0)
webserver_1  | Waking up the sender since TopicPartition(topic='Webtraffic', partition=0) is either full or getting a new batch
webserver_1  | Closing the Kafka producer with 0 secs timeout.
webserver_1  | [2020-03-20 09:25:18,881] {{kafka.py:471}} INFO - Closing the Kafka producer with 0 secs timeout.
webserver_1  | Proceeding to force close the producer since pending requests could not be completed within timeout 0.
webserver_1  | [2020-03-20 09:25:18,881] {{kafka.py:489}} INFO - Proceeding to force close the producer since pending requests could not be completed within timeout 0.
webserver_1  | The Kafka producer has closed.
webserver_1  | [2020-03-20 09:25:18,882] {{kafka.py:502}} DEBUG - The Kafka producer has closed.
webserver_1  | Kafka producer closed
webserver_1  | [2020-03-20 09:25:18,972] {{kafka.py:461}} INFO - Kafka producer closed
webserver_1  | [2020-03-20 09:25:19,035] {{scheduler_job.py:1311}} INFO - Executor reports execution of hourly_webtraffic_dag.webtraffic_load execution_date=2020-03-20 02:07:00+00:00 exited with status success for try_number 1
Sabutobi commented 4 years ago

@tvoinarovskyi thanks a lot for helping me. I've found the solution. Do not know is that perfect but: In the dag execution function, I did only sending the message from "global" producer. But what if initialize produces inside dag executor scope:

def social_daily_load():
    # line below was the solution for me
    producer = KafkaProducer(bootstrap_servers=kafka_host,acks='all',retries=0)
    payload = {
        'topic': 'Webtraffic',        "msg": 'load_webtraffic',        "domainName": 'cleaf.it'
        }
    where_to_send = payload['topic']
    encoded_payload = json.dumps(payload).encode('utf-8')
    producer.send(where_to_send,
                             encoded_payload)
    print('message sent')

Problem: for every message I'll initialize new KafkaProducer and there can be performance issues with that. Thanks once more @tvoinarovskyi for fresh ideas.

tvoinarovskyi commented 4 years ago

@Sabutobi Great that it worked for you. A few points:

Sadly Airflow is meant to be run that way where it creates the DAG per each run. Depending on the executor model it is not possible to share state. My suggestion about global Producer may work for Local/Sequential, but will not work for Celery or Kubernetes, as they spawn processes per each run. I would recommend sending big chunks of data per one DAGRun. I hope you will find your best configuration!

Sabutobi commented 4 years ago

@tvoinarovskyi I have added flush. I just forgot to copy to Git hub post. Thanks once more for helping me.

tvoinarovskyi commented 4 years ago

Oh, and just a side note:

    'start_date': datetime(date.today().year, date.today().month, date.today().day),

Set the start date to a specific date, you will have a bunch of problems with Admin interface is this is not static. You already do catchup=False, so it will not backfill DAGs if they are missed