tomasAlabes / graphql-kafkajs-subscriptions

Apollo graphql subscriptions using kafkajs
MIT License
24 stars 6 forks source link

2.1.9 doesn't send expected message to client when adapting Apollo example #16

Open phampton24 opened 2 years ago

phampton24 commented 2 years ago

I adapted the Apollo Group working sample, https://github.com/apollographql/docs-examples/blob/main/apollo-server/v3/subscriptions-graphql-ws/src/index.ts, to work with grapql-kafkajs-subscription and when I do that I receive an unexpected message on the subscribing client.

When working with the original sample the message payload is { numberIncremented: 0 }, { numberIncremented: 1 }, { numberIncremented: 2 }, and so on. After adapting the sample the message payload received by the client is , { numberIncremented: null }

Here is the code that sets up the pubsub object

pubsub = await KafkaPubSub.create({ topic: 'NUMBER_INCREMENTED', kafka: new Kafka({ clientId, brokers }), groupIdPrefix: clientId, // used for kafka pub/sub, producerConfig: {}, // optional kafkajs producer configuration consumerConfig: {} // optional kafkajs consumer configuration });

and here's the adapted incrementNumber method

function incrementNumber() { currentNumber++; const obj = { numberIncremented: currentNumber }; const msg = JSON.stringify(obj); pubsub.publish("NUMBER_INCREMENTED", Buffer.from(msg)); setTimeout(incrementNumber, 1000); }

In incrementNumber I've tried sending obj, msg and the buffer as currently shown. Sending message yields the same result as sending the buffer, and sending obj results in a type exception from kafkajs due to the producer method expecting a string as it's input.

dinkleberg119 commented 2 weeks ago

Try implementing the resolve method in your resolver like this:

subscribe: async () => {
    return pubsub.asyncIterator(subscription)
},
resolve: (payload) => {
    const bufferValue = Buffer.from(payload.value);
    const json = bufferValue.toString('utf-8');
    return JSON.parse(json);
}