spring-attic / spring-integration-kafka

Apache License 2.0
324 stars 180 forks source link

Consider adding `payloadType` to KafkaMessageDrivenChannelAdapterSpec #272

Closed Cameronjmayfield closed 5 years ago

Cameronjmayfield commented 5 years ago

Unless I'm missing a simpler way to configure this, payloadType on the KafkaMessageDrivenChannelAdapterSpec would allow us to use a type aware message converter.

artembilan commented 5 years ago

I think this is just an omission during introduction of KafkaMessageDrivenChannelAdapter.setPayloadType() in version 2.1.1.

So, good catch! Feel free to contribute the fix!

I see you have some commit in your local copy already 😉

garyrussell commented 5 years ago

Perhaps a separate issue, but we should change payloadType to Type so generics are retained for ParameterizedType, which can be obtained by reflection or via a ParameterizedTypeReference.

Then we can decode things like Map<Foo, Bar>.

We might not be able to backport that though.

artembilan commented 5 years ago

Well, I think we need to keep it as a Class because we can't simply provide a Class then from the XML config: the string value isn't converted to class properly. In the latest Spring Integration components I have a pair of properties on the matter:

/**
     * Specify the type of payload to be generated when the inbound HTTP request
     * content is read by the converters/encoders.
     * By default this value is null which means at runtime any "text" Content-Type will
     * result in String while all others default to <code>byte[].class</code>.
     * @param requestPayloadType The payload type.
     */
    public void setRequestPayloadTypeClass(Class<?> requestPayloadType) {
        setRequestPayloadType(ResolvableType.forClass(requestPayloadType));
    }

    /**
     * Specify the type of payload to be generated when the inbound HTTP request
     * content is read by the converters/encoders.
     * By default this value is null which means at runtime any "text" Content-Type will
     * result in String while all others default to <code>byte[].class</code>.
     * @param requestPayloadType The payload type.
     */
    public void setRequestPayloadType(ResolvableType requestPayloadType) {
        this.requestPayloadType = requestPayloadType;
    }

The ResolvableType has this factory method:

/**
     * Return a {@link ResolvableType} for the specified {@link ParameterizedTypeReference}.
     * <p>Note: The resulting {@link ResolvableType} instance may not be {@link Serializable}.
     * @param typeReference the reference to obtain the source type from
     * @return a {@link ResolvableType} for the specified {@link ParameterizedTypeReference}
     * @since 4.3.12
     * @see #forType(Type)
     */
    public static ResolvableType forType(ParameterizedTypeReference<?> typeReference) {

So, we are fully covered with plain old XML and possible complexity and flexibility via Java configuration.