nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

Produce messages as Objects #66

Closed SamiSammour closed 6 years ago

SamiSammour commented 6 years ago

Hi there, I am trying to produce some messages to consume them later. When the message is a String everything is ok but when I try to send an Object the message doesn't get sent and I don't event get any error of any kind... Here is my code (I am using the same config as the one in the tests folder)

'use strict';

const {KafkaStreams} = require('kafka-streams');
const {nativeConfig: config} = require('./config');

const kafkaStreams = new KafkaStreams(config);
const stream = kafkaStreams.getKStream(null);

//creating a stream without topic is possible
//no consumer will be created during stream.start()
stream.to('my-output-topic');
//define a topic to stream messages to

stream.start().then(() => {
  stream.writeToStream({
    x: 'x',
    y: 'y',
    z: 'z'
  });
});

Can you please help on this? Am I doing anything wrong?

SamiSammour commented 6 years ago

I have solved this by sending the JSON object stringified.. you should mention that in your documentation in the writeToStream function

stream.writeToStream(JSON.stringify({
  x: 'x',
  y: 'y',
  z: 'z'
}));
krystianity commented 6 years ago

Hi @SamiSammour there are different ways to produce messages. But I do have to admit that they are a bit confusing and could be documented better.

writeToStream is not the deciding call here, as it just pipes basically anything to the internal $stream. The settings for producing to Kafka are set in the to() call.

https://nodefluent.github.io/kafka-streams/jsdoc/dsl_StreamDSL.js.html#line976

The simple type "send" expects a string as final stream value, the other buffer types can also deal with objects.

SamiSammour commented 6 years ago

I’m still confused on how I could produce events to a stream? Or do I have to use the basic Kafka producer to produce messages to a topic when a desired event happens?

krystianity commented 6 years ago

Basically node-kafka-streams opens consumers and producers based on the dsl you provide and connects these with most.js streams internally.

By providing a .to() call you define that the contents of a stream should be produced to a topic. As Kafka topics can contain any kind of message schemes (except for key and value initially) there are multiple ways to deal with them and also deal with producing them back to a topic in the end.

Lets assume you got the consuming part and we are only stuck on producing them back to a topic.

When calling .to("topic-name", 30 /* partition count of your topic */); you can define a few things. You can also pass a certain "style" parameter for messages as third parameter. "send", "buffer", "bufferFormat" these styles are inherited from the underlying Kafka client library "node-sinek".

"send" just sends as is, "buffer" will expect the messages value to be an object and will wrap in a certain style e.g. {payload, id, version}, "bufferFormat" will additionally add some more stuff to it e.g. a type. All of these also produce as JSON strings. Which is also why "send" is the default value.

Additionally to these types there must also be a way for node-kafka-streams users to define the key, the partition or the topic on a per message event basis, which is why it will also constantly look for objects that contain key, value fields which it will then remap (also based on the type).

The logic for this stuff is in messageProduceHandle.js

The number of real world code examples is currently still very sparse, here is a TypeScript example of advanced node-kafka-streams usage: https://github.com/nodefluent/anon-kafka-mirror/blob/master/lib/AnonKafkaMirror.ts#L178

SamiSammour commented 6 years ago

Let me tell you what I have in mind and you would be very helpful if you suggest for me the best way of dealing with this stuff.

My app will communicate with a 3rd party API through a Kafka cluster, now this is new for both of the sides so we don't actually know how to do it properly. So basically when an event happens (a user logs in for example) a message should be sent through the Kafka cluster to the 3rd party API and the app should wait for the response through the cluster as well.

All the examples I found out there insert messages into topics through the CLI command found that is shipped with Kafka, but in a real life scenario you have to do it programatically, and when looking inside the examples folder I found that produceToTopic.js uses stream.writeToStream(message).

So can you please guide me to the best way to do it using your lib? Or should I use another lib? "node-sinek" for example? If so then do you have any shortcut for this lib?

Thanks.

krystianity commented 6 years ago

Hi Sami.

First of all (though I cannot fully see the overall picture here) I am afraid talking to third party Kafka APIs especially if they are exposed externally in any way is a very bad idea - however I am sure you guys know what you are doing. (There should always be a different gateway abstraction in front of Kafka in such scenarios)

As you seem to have no experience in Kafka at all, I would suggest to start with a simple client first to understand the concept of Kafka better, before trying to solve your problem with kafka-streams from the start, although it is possible.

Kafka has a steep learning curve which, in my opinion, should be tackled first, before doing anything in a production environment - as there are tons of pitfalls.

My suggestion is to go for the "simple" client first, for example in this folder you can find a sinek native producer and consumer example (you should ignore the sasl and ssl stuff in case the Kafka you are dealing with does not need these additional security and authentification measures)

Judging from the stuff you told me, you will need to create two Kafka topics, and your service will need to have a NConsumer and NProducer instance at the same time. When a user logs in you will want to produce to the "login-event" topic and the 3d party can read it on the other side, when the 3d party is done they will produce to a "login-event-response" topic which you will consume and do whatever you need to do with the event.

But to be honest this sounds like something you could do much easier with a simple REST call from your service to the 3d party API, especially if you need any kind of that for the user-login call, because in the Kafka scenario above you would have to implement your own "call-stack" to await the 3d party events.

Good luck ;)

SamiSammour commented 6 years ago

You are absolutely right.. Thank you because after your last comment I have been searching for some use case of using Kafka for integration and I couldn't find one, so we agreed we will drop this solution as it won't have any added value.. just headaches Cheers.