spring-cloud / stream-applications

Functions and Spring Cloud Stream Applications for data driven microservices
https://spring.io/projects/spring-cloud-stream-applications
250 stars 106 forks source link

router sink not reading correct integer values from Avro Record #452

Closed gauravsingh96 closed 1 year ago

gauravsingh96 commented 1 year ago

Hi Team,

I am generating an Avro GenericData.Record and passing it to router-sink to route to a topic. However the application producing the Avro is correctly encoding the integer value but when it reaches the router, the router reads the value incorrectly. For example, 1980 is written as -31961080. If however, I remove router from definition the correct value reaches the output topic. The stream is as follow:

:inputTopic > my-custom-app-to-convert-json-to-avro | router > :outputTopic

I have searched through a lot of documentation and could find the reason or some configuration that might help.

artembilan commented 1 year ago

the router reads the value incorrectly

How do you read that value? Just | router is not enough for us to understand what is going on.

However I guess that the problem is with a byte[] as a payload of the message received by this Router Sink. Additional problem might be with a content type as a application/json, but not for Avro one. In this case that input payload is converted to String, which is really wrong for Avro.

Need to investigate together with the team...

Meanwhile consider to transfer routing data via simple headers.

Thank you for report!

gauravsingh96 commented 1 year ago

It's a OOTB router only with expression and destination-mapping supplied. I have not specified any other properties to it. I also think this is because of byteArrayTextToString function which converts byte[] to String. However even if I remove the byteArrayTextToString function from router definition it still send the wrong value to output topic.

Also, I could not understand routing data via simple headers. How do we do that ? FYI, I am routing data based on my header only currently. It works fine and the message is sent to correct topic too. Just those integers values in the payload are not coming correctly.

artembilan commented 1 year ago

OK. Sounds like you indeed uses headers for routing, and not a payload which might be a byte[] making it useless for routing logic.

I'm not sure how you can remove byteArrayTextToString, but according to your conclusion and my guess we are on the same page: even if you send a serialized Avro there is no respective contentType header provided and Spring Cloud Stream assumes that it is JSON and that ByteArrayTextToString does its "dirty" work:

        if (message.getPayload() instanceof byte[]) {
            final MessageHeaders headers = message.getHeaders();
            String contentType = headers.containsKey(MessageHeaders.CONTENT_TYPE)
                    ? headers.get(MessageHeaders.CONTENT_TYPE).toString()
                    : MimeTypeUtils.APPLICATION_JSON_VALUE;

            if (contentType.contains("text") || contentType.contains("json") || contentType.contains("x-spring-tuple")) {
                message = MessageBuilder.withPayload(new String(((byte[]) message.getPayload())))
                        .copyHeaders(message.getHeaders())
                        .build();
            }
        }

Any chances that you can produce data into that inputTopic with a correct contentType header for Avro: application/*‌​+avro ?

gauravsingh96 commented 1 year ago

I have set the following properties in my custom app that produces the AVRO record.

spring:
  cloud:
    stream:
      bindings:
        output:
          content-type: application/*+avro

Does this help ?

Also regarding removing byteArrayTextToString, I cloned the repo and removed the definition from the properties file.

artembilan commented 1 year ago

I think so. Well, since you can modify that app, would you mind to debug it then (e.g. enabling DEBUG logs) to see what is really a contentType of the message received by the router and what is the content? Also: what version of Router Sink you use, please?

gauravsingh96 commented 1 year ago

I debugged routerSink infact, but not byteArrayTextToString function. Router sink gets application/json as contentType and in the payload the integer is converted to a ? after byte[] to String conversion. I Will check for byteArrayTextToString and let you know of the result.

artembilan commented 1 year ago

So, then output.content-type doesn't help, or it is overridden on a ride...

gauravsingh96 commented 1 year ago

Yes, and I have tried sending the content-type header by producing the message manually through a Kafka producer separately too. Didn't work that way too.

artembilan commented 1 year ago

Well, that header name must be contentType - the MessageHeaders.CONTENT_TYPE constant value.

gauravsingh96 commented 1 year ago

OK I will try with this too.

artembilan commented 1 year ago

So, we just tested some Spring Cloud Stream Avro producer based on a StreamBridge and content-type: application/*+avro together with SchemaRegistry. The Router Sink receives a correct contentType header.

Now it would be great to know what your custom application is doing. It feels like you receive a JSON and convert it into Avro manually. I believe that your receive an input with a contentType=application/json header. But what you do next? How do your really serialize Avro and output result to the binding?

gauravsingh96 commented 1 year ago

Yes you are correct. The custom app receives a json record and creates a avro GenericRecord from it and returns it which is then received by the router sink to route to a destination.

Also, when you tested, that correct contentType is reaching router sink, is the payload intact too ? I mean any integer values are not corrupted ? Because in that case I will look into sending the correct header from the custom app.

artembilan commented 1 year ago

In our tests we just get a plain byte[] since the contentType header is something like content-type: application/*+avro against our test Avro record. Therefore that ByteArrayTextToString function does not modify the message.

So, it also would be great if you check the record in Kafka topic to see what is the value of this contentType header even before it reaches the router.

gauravsingh96 commented 1 year ago

I tried hardcoding the contentType header in my custom app and then the router gets the correct value and is able to read correct values for integers too. But my concern now is why isn't the below configuration working ?

spring:
  cloud:
    stream:
      bindings:
        output:
          content-type: application/*+avro

To your question, if not hard-coded the value in header is of type application/json and I was expecting the above config to work here. If I understand and above config works, will it not send the outbound message with contentType as what I specify - application/*+avro. Could it be that if contentType is already set, the above config is rendered useless and nothing is set. Because the inbound payload to custom app already has application/json type.

artembilan commented 1 year ago

Glad to see that hard-codding workaround does its trick. That's my feeling that inbound CT is not overridden 🤷

We investigate further with the team.

Would you mind meanwhile sharing with us what exactly your converter app is doing. Do you just create an Avro record from that inout JSON and return it as is from your function with the hope that a proper Avro converter will serialize this record properly according to that output CT config?

gauravsingh96 commented 1 year ago

Yeah so my converter is basically consuming Json as input and producing GenericRecord. It is simply setting the values from json to Record, copying all the headers from input message (we send some headers for our further logic, routing and etc) and returning the final Message. And I hoped the CT config will append/update the correct type to it before sending it to output channel.

artembilan commented 1 year ago

Yeah... I think the logic is if you produce a whole Message, then its CT header has a precedence over configuration. Therefore your workaround setting the application/*+avro into CT header is correct way to go. Probably if you would remove CT header from a reply message, then config would do its job.

gauravsingh96 commented 1 year ago

Hmm got it. I think in that case both the solution more or less has to be in my code (either not copying CT from previous headers or hard coding it). In latter case I no longer need the config.

artembilan commented 1 year ago

There is some doc on the matter: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_mechanics. Yes, it does not say yet that CT header from a reply message has a precedence, but it really assumes that returned message is not modified by the framework. Therefore an explicit CT header in your reply message is a way to go.

Closing this as Works as Designed.

We will see what we can do in that doc to be more precise.

gauravsingh96 commented 1 year ago

Thanks @artembilan. I am good with it.