adimian / kirby

Event-driven task execution framework based on Kafka
MIT License
6 stars 4 forks source link

topic/consumer/producer separation #70

Open nicksjl-adimian opened 4 years ago

nicksjl-adimian commented 4 years ago

In Kafka logic, a topic, a consumer and a producer are 3 separated objects. In current Kirby, Topic object contains a consumer and a producer. For the sake of code simplicity AND Kirby user understanding, it is proposed to stay as close as possible to Kafka logic and to separate those objects. In a Kirby script, user would define a Kirby or a Kafka topic for source or destination. He would then create a consumer or a producer to read/write in the topic. He could either create a Kafka Consumer or a kirby consumer to get more features (rollback for instance). An example of kirby script which would fit with the proposed solution is given attached.

import os
from kafka import KafkaConsumer
import kirby
from kirby import create_kafka_topic

def main():
    # Kirby app Initialisation
    kirby_app = kirby.Kirby()

    # Environment variables
    kirby_app.add_needed_environment_variables({
            "CASHREGISTER_TOPIC_NAME": {"type": str},
            "TOTAL_FILES_FOLDER_PATH": {"type": str},
            "SLEEPING_TIME": {"type": str},
        })

    # Register source/destination. WARNING : names added as source or destination need to exist
    # in Kirby Web API Database as externals
    topic_name = "cashregister"
    kirby_app.add_source_name(topic_name)
    kirby_app.add_destination_name("totalfiles")

    # Topic creation
    create_kafka_topic(name=topic_name, nb_partitions=5, replications=2)

    # Consumer
    cash_consumer = KafkaConsumer(topic="cashregister", group_id='toto')
    # Or if we want more features (rollback for example).
    # cash_consumer = KirbyKafkaConsumer(topic="cashregister",group_id='toto'))

    for cash_register in cash_consumer:
        file_path = os.path.join("a_given_path", f"total_cash.txt")
        with open(file_path, "w+") as file:
            file.writelines(cash_register)

    # If script is not a daemon, consumer should be closed : cash_consumer.close()
PierreSavatte commented 4 years ago

I have several questions :

The feature for cash_register in cash_consumer is specific to the Consumer (currently in the code). The KafkaConsumer (as defined in the kafka-python library) does not support this type of iteration. Do you propose to overload the object and add some features to the KafkaConsumer?

The idea behind the context manager is to address the need of .close() a consumer (and/or a producer). With the with Topic(...) as topic: the user does not need to bother with closing the objects.

I agree, currently the definition of the needed variables may be not clear enough for the user. We could instead of creating a new function (that need to be called after init of Kirby obj), add clarification to the init by adding a keyword (kirby.Kirby(needed_env_var=...)). What do you think?

nicksjl-adimian commented 4 years ago

1) Actually, I think KafkaConsumer support this type of iteration. Following code works pretty well

consumer = KafkaConsumer(
        "mytopic",
        bootstrap_servers="127.0.0.1:9092",
    )
    for msg in consumer:
        print(msg)

2) I totally understand. I suggest user chooses between using KirbyKafkaConsumer with context manager :

with KirbyKafkaConsumer as consumer:
    for msg in consumer:
        print(msg)

or doing everything on its own :

consumer = KafkaConsumer():
    for msg in consumer:
        print(msg)
consumer.close()

3) Perfect.

PierreSavatte commented 4 years ago
  1. I didn't knew at all!

  2. Of course, that's not mandatory to use the context. The user will have the choice