spring-cloud / stream-applications

Functions and Spring Cloud Stream Applications for data driven microservices
https://spring.io/projects/spring-cloud-stream-applications
251 stars 104 forks source link

Serializing POJO in JSON format with S3 sink #200

Closed CEDDM closed 2 years ago

CEDDM commented 2 years ago

Problem description: I created a stream in SCDF like this : s3-source | my-processor | s3-sink. The idea is to handle a XML source file in a s3 bucket, transform and enrich the data in my processor and write out a JSON file in another s3 bucket.

It worked fine in SCDF 2.7.1 with the s3 starter apps in version 3.0.0. But it doesn't work anymore in SCDF 2.9.1 with the 3.1.1 s3 apps.

Generally speaking : No matter what the source is, serializing a POJO in JSON String out of my processor is not working anymore with s3 sink because it doesn't support String payload anymore as the error message says : Unsupported payload type: [class java.lang.String]. The only supported payloads for the upload request are java.io.File, java.io.InputStream, byte[] and PutObjectRequest

Why String payload are not supported anymore in s3 sink ? Maybe to avoid encoding problems ? What is the correct way to handle this ? Adding an extra transform app just to change the String in byte[] seems a bit painfull and unnecessary ressources consumption.

I saw this issue #74 but I think it's not just a media type problem

artembilan commented 2 years ago

The S3MessageHandler has never supported String for a payload. I think the logic in between has been tightened on Spring Cloud Stream level in regards to the contentType value. And since you produce a String from the process it is converted back to string on the sink side. It probably was not done before and the data from wire has left as a byte[] for that sink. But since S3MessageHandler does not support String, it is wrong to assume some automatic conversion to byte[] back. It is better to write the result of your process to the byte[], not string.

The problem with String on the S3MessageHandler that it won't be clear if you provide a file name or a content of the file. So, we left string type out of that S3MessageHandler logic.

Makes sense?

CEDDM commented 2 years ago

Thanks @artembilan for your explanation Since the filename is given by key-expression, we could assume that the payload is always the content of the file, no matter what the type is. But in my opinion, the main problem is the code readability and maintenance for company business logic : Function<InDTO, OutDTO> process() is way more understandable than something like Function<InDTO, InputStream> process() or Function<InDTO, byte[]> process()

Unless there is a simple way of making a "double serialization" POJO -> JSON String -> byte[] ?

artembilan commented 2 years ago

Yep! Fully agree: the code like Function<InDTO, OutDTO> process() is more in business language, than anything what makes programming powerful.

In this case it is probably a bit misleading to say that your OutDTO is serialized to JSON String. Well, it is a JSON, but definitely not a string. So, it probably something on the input side of the sink which makes that conversion from byte[] to string just because it sees that as a JSON and doesn't see any expected types to be converted to.

Need some investigation and discussion with the team...

artembilan commented 2 years ago

Consider this as a workaround on the s3-sink : spring.cloud.stream.bindings.input.consumer.useNativeDecoding = true. This way no conversion from the byte[] to String is going to happen.

CEDDM commented 2 years ago

Thank you again for your explanations. And sorry if I mixed things up. Many things happen between the apps ans it's not always easy to understand where the problem is. I will try the workaround tomorrow

artembilan commented 2 years ago

I've raised issue in Spring Cloud Stream where such a conversion happens from byte[] to String: https://github.com/spring-cloud/spring-cloud-stream/issues/2246

CEDDM commented 2 years ago

I ran a test with the workaround but unfortunately I still have the same exception image

artembilan commented 2 years ago

Maybe your binding name is not an input. Docs are here: https://docs.spring.io/spring-cloud-stream/docs/3.1.5/reference/html/spring-cloud-stream.html#_consumer_properties

(for example, spring.cloud.stream.bindings.input.consumer.concurrency=3) And then useNativeDecoding:

useNativeDecoding

When set to true, the inbound message is deserialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value deserializer). When this configuration is being used, the inbound message unmarshalling is not based on the contentType of the binding. When native decoding is used, it is the responsibility of the producer to use an appropriate encoder (for example, the Kafka producer value serializer) to serialize the outbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the producer property useNativeEncoding.

Default: false.

CEDDM commented 2 years ago

I tried with the default form to avoid error with the binding name but still same Unsupported payload type: [class java.lang.String] error image

artembilan commented 2 years ago

OK. Thanks. Looking into that in debug mode... Why do you do screenshots? Why just not place a plain code snippets, so I would be able to copy-paste them?..

artembilan commented 2 years ago

Not correct: that works for me:

spring.cloud.stream.default.consumer.useNativeDecoding = true

and the code is like:

@Bean
        public Consumer<Message<?>> consumeBytes() {
            return System.out::println;
        }

console output:

GenericMessage [payload=byte[267], headers={skip-input-type-conversion=true, aws_shard=shardId-000000000000, aws_checkpointer=ShardCheckpointer{key='anonymous.01b563e1-a891-43a4-85a9-7e57c8ed12d6:test_stream:shardId-000000000000', lastCheckpointValue='49624079089036418142403452771959552389200412755114328066'}, id=6f654fca-c6d5-90b2-8ad7-f4db8172ceef, sourceData={SequenceNumber: 49624079089036418142403452771948672056823881092541972482,ApproximateArrivalTimestamp: Fri Nov 19 11:09:33 EST 2021,Data: java.nio.HeapByteBuffer[pos=0 lim=267 cap=267],PartitionKey: 1,}, aws_receivedPartitionKey=1, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49624079089036418142403452771948672056823881092541972482, timestamp=1637338179204}]
GenericMessage [payload=byte[267], headers={skip-input-type-conversion=true, aws_shard=shardId-000000000000, aws_checkpointer=ShardCheckpointer{key='anonymous.01b563e1-a891-43a4-85a9-7e57c8ed12d6:test_stream:shardId-000000000000', lastCheckpointValue='49624079089036418142403452771959552389200412755114328066'}, id=882082e3-c5ae-21be-dd08-2352726d443a, sourceData={SequenceNumber: 49624079089036418142403452771949880982643495721716678658,ApproximateArrivalTimestamp: Fri Nov 19 11:09:33 EST 2021,Data: java.nio.HeapByteBuffer[pos=0 lim=267 cap=267],PartitionKey: 1,}, aws_receivedPartitionKey=1, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49624079089036418142403452771949880982643495721716678658, timestamp=1637338179209}]

Without useNativeDecoding it is:

GenericMessage [payload={"@class":"org.springframework.messaging.support.GenericMessage","payload":"Message0","headers":{"@class":"java.util.HashMap","id":["java.util.UUID","c6c98933-7782-bdc8-d21c-4751ef31822c"],"event.eventType":"createEvent","timestamp":["java.lang.Long",1637338302769]}}, headers={aws_shard=shardId-000000000000, aws_checkpointer=ShardCheckpointer{key='anonymous.7ff2f3aa-5972-4e75-bd28-17a62fbcad4b:test_stream:shardId-000000000000', lastCheckpointValue='49624079089036418142403452771971641647396567568076505090'}, id=30de651c-9d54-88ce-6d46-2e1b704d4acf, sourceData={SequenceNumber: 49624079089036418142403452771960761315020035905504149506,ApproximateArrivalTimestamp: Fri Nov 19 11:11:37 EST 2021,Data: java.nio.HeapByteBuffer[pos=0 lim=267 cap=267],PartitionKey: 1,}, contentType=application/json, aws_receivedPartitionKey=1, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49624079089036418142403452771960761315020035905504149506, timestamp=1637338303174}]
GenericMessage [payload={"@class":"org.springframework.messaging.support.GenericMessage","payload":"Message1","headers":{"@class":"java.util.HashMap","id":["java.util.UUID","ad916281-1fb4-ccdc-f937-894083aa90f0"],"event.eventType":"createEvent","timestamp":["java.lang.Long",1637338302786]}}, headers={aws_shard=shardId-000000000000, aws_checkpointer=ShardCheckpointer{key='anonymous.7ff2f3aa-5972-4e75-bd28-17a62fbcad4b:test_stream:shardId-000000000000', lastCheckpointValue='49624079089036418142403452771971641647396567568076505090'}, id=cbf746d3-d319-fc7f-8e7c-91a66cb2945a, sourceData={SequenceNumber: 49624079089036418142403452771961970240839650534678855682,ApproximateArrivalTimestamp: Fri Nov 19 11:11:37 EST 2021,Data: java.nio.HeapByteBuffer[pos=0 lim=267 cap=267],PartitionKey: 1,}, contentType=application/json, aws_receivedPartitionKey=1, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49624079089036418142403452771961970240839650534678855682, timestamp=1637338303176}]

So, that confirms that useNativeDecoding does the trick for us.

Sorry, I'm not familiar with DataFlow and cannot say how properly to configure that sink with this property. Only what I can tell that s3-sink is fully based on a:

@Bean
public Consumer<Message<?>> s3Consumer() {
    return amazonS3MessageHandler(null, null, null)::handleMessage;
}

which is very close to what I have tested against.

CEDDM commented 2 years ago

OK thank you for testing. I will try to understand what I do wrong ...

CEDDM commented 2 years ago

Hi again ! It seems to me that there is a difference when using SCDF. I tried to replace the s3-sink with a log sink and both useNativeDecoding properties :

--spring.cloud.dataflow.stream.app.label=log
--spring.cloud.stream.default.consumer.useNativeDecoding=true
--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true
--spring.cloud.stream.bindings.input.destination=test-opex.import-opex
--log.expression=#root
--spring.cloud.stream.bindings.input.group=test-opex
--spring.cloud.dataflow.stream.app.type=sink

Looks good because we can see other properties with spring.cloud.stream.bindings.input added by SCDF.

But here is the result (truncated) : log-sink : GenericMessage [payload={"nomLot":"200312.69FA698X300032","matricule":"", (...), nativeHeaders={}, replyChannel=nullChannel, kafka_offset=11, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c3c4ebb, kafka_receivedPartitionId=0, kafka_groupId=test-opex}]

As you can see, I use Kafka. I don't know if it makes any difference

CEDDM commented 2 years ago

Well ... I created a very simple producer like yours and this time I can see the difference as you showed ... So I guess the SCDF starter apps does have something different which causes the useNativeDecoding doesn't work as expected

For my sink, I used :

<spring-boot.version>2.5.5</spring-boot.version>
<spring-cloud.version>2020.0.4</spring-cloud.version>
artembilan commented 2 years ago

@CEDDM ,

thank you for looking into this more!

You know the log-sink is not a good sample to follow. It has this configuration:

 <functionDefinition>byteArrayTextToString|logConsumer</functionDefinition>

So, independently of the config, that byteArrayTextToString function is going to make a trick for us to let those byte[] to be human-friendly in the final logs.

I think we don't DataFlow for this test. There is just enough to have s3-sink bound to Kafka topic. So, we just produce data to that topic and see what is going on the consumer side.

Right now I'm trying to find which is would be closer to S3 to be able to test this in the isolation...

AndrewZurn commented 2 years ago

Hey all,

I'm believe I'm seeing the same issue described above when using Kafka binders and attempting to write to an S3 Sink.

I've attempted to use a variety of configurations on the Sink job of which have included:

Seems like from what other's have said a possible regression between versions? Not quite sure where to take this, although I see an interesting artifact in Maven Central that seems to indicate there is a Kafka-specific S3 Sink (although I can't quite find where the source code fo this is... doesn't seem to be hosted in this project, so I'm curious where it resides/who is publishing it: https://mvnrepository.com/artifact/org.springframework.cloud.stream.app/s3-sink-kafka/3.1.1).

I might try to figure out how to get that into my SCDF installation and see if anything fun comes of it.

Thanks, Andrew

Edit - maybe that code/image on docker hub as well is from this project and the source code is just bundled up and suffixed with kafka/rabbit, in which case I'm probably going to see the same issues. :)

Edit - looks like input.consumer.content-type is the winner. Odd seeing some camel case and some kabob casing in the documentation. Couldn't end up getting it to work right with Kafka bindings (our cluster is beyond ancient) but Rabbit seems to have done the trick for the work I was trying to do.

artembilan commented 2 years ago

The s3-sink-kafka is a generated artifact. Essentially it is a result of bundling s3-consumer artifact and Kafka binder to make it fully blown Spring Cloud Stream application.

The mentioned upstream issue has been fixed a while ago: https://github.com/spring-cloud/spring-cloud-function/issues/771.

And we also have released an new version of this Sink as well: https://mvnrepository.com/artifact/org.springframework.cloud.stream.app/s3-sink-kafka/3.2.1