mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
600 stars 206 forks source link

Running a producer and a consumer in multithreaded application #211

Closed steinio closed 5 years ago

steinio commented 5 years ago

Hi, this is not really an issue, just a question on how to use the library in a multithreaded application.

I am trying to do this by using the buffered producer example and the consumer example.

I have an application with basically a main that runs up several data acquisition threads and threads that are communicating with external hardware via tcp and serial. I want to let some of these threads use the producer to send data directly and some to read consumer data to send to the various hardware, and I understand that I can just pass the producer or consumer object as a reference to these threaded objects and call something like this inside the threads, since they are threadsafe, something like this for the producer

void KafkaProducer::writeData(std::string payload)
{
    builder.payload(payload);
    producer.produce(builder);
}

and something like this for the consumer

std::string KafkaConsumer::readData()
{
    Message msg = consumer.poll();
    std::string payload= msg.get_payload(); 
    return payload;
}

My question now is, what do I do with the producer or consumer object after creating it and passing it as reference for use? I only initialize the connection and define callback functions in the object constructor btw.

Do I just create the object in main() before the threads are created and pass them as ref to the objects running in the threads and do nothing else, or is there something else I have to add, like run a seperate thread for the producer and consumer that, polls() or does some other background work? The goal in the end is to produce and consume data asynchronously.

accelerated commented 5 years ago

Hi @steinio, first please read this post. Essentially any type of message (aside from pure std::string or std::vector) will need to be serialized (producer side) and de-serialized by the application (consumer side).

steinio commented 5 years ago

Hi, thanks for the quick response. I am actually going to use Avro for serialization, but was waiting to add it untill I got this sorted. How would I add serialized data to the buffer, just pass it as a string after I encode/serialize it? Something like

std::string serializeMyData(Data data)
{
    avro::Writer writer;
    avro::serialize(writer, data);
    InputBuffer buffer = writer.buffer();
    std::ofstream out;
    avro::istream is(buffer );
    out << is.rdbuf();
    return out.str();
}

I want it to be non-blocking, so if I understood your response, then I have to use a buffered type in the builder, correct?

Also, back to my original question, if I was using the producer as mentioned, passed as reference into objects running in seperate threads, where I call the snipped seen. Given that this is a buffered type, do I need to run anything else after initializing the connection in the producer class constructor, Like a poll or some other

EDIT

I got this snippet from a forum, but see now that this was not what I was looking for, and does not work. I guess I have to figure out how to convert the serialized data into something that can be read into either string or a vector or something similar.

steinio commented 5 years ago

After looking at avro c++ examples, and trying to compile something similar, I only get more confused as to how I can use this with the buffered producer.

I am using this example as as starting point

std::unique_ptr<avro::OutputStream> out = avro::memoryOutputStream();
avro::EncoderPtr e = avro::binaryEncoder();
e->init(*out);
avro::encode(*e, procMsg); // procMsg is an avro datastructure which is initialized earlier with some random data
uint8_t** buf;
size_t* len;
out->next(buf, len);

My question is now, how can I send this serialized data in buf in the producer? This question might be too Avro specific, but I cant find a forum where I can ask the Avro developers this question. Do I have to make a Buffer, like this?

Buffer buffer(*buf, *len);

I can make a new issue for this question specifially, since it does not have anything to do with this issue.