apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
153 stars 101 forks source link

CamelPostgresqlsourceSourceConnector publishes InputStreamCache object reference to topic #1543

Open camwardy opened 1 year ago

camwardy commented 1 year ago

Hi @oscerd,

I'm attempting to use the CamelPostgresqlsourceSourceConnector. I can create the connector but it publishes an InputStreamCache object reference to the topic, in the form:

org.apache.camel.converter.stream.InputStreamCache@6e37b6bd
org.apache.camel.converter.stream.InputStreamCache@6c833187
org.apache.camel.converter.stream.InputStreamCache@59bcea5a
org.apache.camel.converter.stream.InputStreamCache@4235ec9b
org.apache.camel.converter.stream.InputStreamCache@1fd4716e
org.apache.camel.converter.stream.InputStreamCache@3a08393d
org.apache.camel.converter.stream.InputStreamCache@2e2bb2c
org.apache.camel.converter.stream.InputStreamCache@c2ac46c
org.apache.camel.converter.stream.InputStreamCache@2a224dd6
org.apache.camel.converter.stream.InputStreamCache@1437780
...

Is there any additional config I need to set to get the connector to publish something that a consumer can easily utilise?

Here is my connector config:

{
    "name": "source",
    "config": {
        "connector.class": "org.apache.camel.kafkaconnector.postgresqlsource.CamelPostgresqlsourceSourceConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "camel.kamelet.postgresql-source.databaseName": "db",
        "camel.kamelet.postgresql-source.query": "SELECT (id,name) FROM source;",
        "camel.kamelet.postgresql-source.serverName": "postgres",
        "camel.kamelet.postgresql-source.username": "postgres",
        "camel.kamelet.postgresql-source.password": "postgres",
        "camel.kamelet.postgresql-source.delay": 2000
    }
}

Thanks

aozmen121 commented 1 year ago

@orpiske & @oscerd, We're also seeing the same InputStreamCache issue, the manual is very barebones and doesn't this particular fix/workround. Can you please help?

oscerd commented 1 year ago

Can you try with

key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
camwardy commented 1 year ago

@oscerd We get the following exception: org.apache.kafka.connect.errors.DataException: ByteArrayConverter is not compatible with objects of type class org.apache.camel.converter.stream.InputStreamCache

oscerd commented 1 year ago

I need to reproduce this but I think we need to disable the stream caching here: https://github.com/apache/camel-kafka-connector/blob/main/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L143

with

context.setStreamCaching(false);

You can try to add this fix and rebuild the whole project, starting from the tag you're using, for the time being.

aozmen121 commented 1 year ago

@oscerd Doesn't look like setting converter to org.apache.kafka.connect.converters.ByteArrayConverter worked for us either.

oscerd commented 1 year ago

@oscerd We get the following exception: org.apache.kafka.connect.errors.DataException: ByteArrayConverter is not compatible with objects of type class org.apache.camel.converter.stream.InputStreamCache

The documentation is barebones because nobody is contributing a single line of documentation or code. Open source is not just asking.

oscerd commented 1 year ago

@oscerd Doesn't look like setting converter to org.apache.kafka.connect.converters.ByteArrayConverter worked for us either.

Try to build the project with the fix suggested above.

jakubmalek commented 1 year ago

Hi, I faced the same issue with camel-http-secured-source-kafka-connector the task is returning InputStreamCache object as the SourceRecord value, which cannot be converted with ByteArrayConverter.

Disabling streaming cache with camel.main.streamCachingEnabled property changes the java-type to org.apache.camel.converter.stream.CachedOutputStream$WrappedInputStream which is not suitable either.

When combined with camel.source.marshal: jackson I get following error:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.camel.converter.stream.InputStreamCache and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)

I tried looking for suitable camel-http properties but I couldn't find anything.

For now I've bypassed this by implementing custom transformation plugin which transforms the InputStream into the byte array. However I was wondering if there a way to configure the connector to at the very least returned unwrapped bytes or preferably being able to convert it to HashMap for the JSON payload to be able to apply further field transformations ?

cmartinez-peigo commented 1 year ago

is there no one who have resolved this issue? please help

oscerd commented 1 year ago

We are investigating what could the problem. @valdar is looking at that

vijaiks commented 4 weeks ago

Has anyone resolved this issue?

mcforres commented 2 weeks ago

Dropping a note here on a workaround for this issue. Context, i'm not a java developer, and I'm fairly new to Apache Camel. I'm sure it would be better to expose this as property on the respective connectors that deal with streams (postgres, dynamodb, etc) rather than modifying the actual source task.

In my case, I am trying to use the dynamodb streams source connector and experienced the same issue @camwardy described above.

With the help of AI, code was generated that reads the stream and converts it into a byte array and/or a string. Validated solution on camel-kafka-connector 3.20.3 tag

Main changes were made to the poll() method at https://github.com/apache/camel-kafka-connector/blob/f0e2ca924773165f322932a5d7c2eed332ae6220/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L197

I replaced that entire method with this code block

public synchronized List<SourceRecord> poll() {
    LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
    final long startPollEpochMilli = Instant.now().toEpochMilli();

    long remaining = remaining(startPollEpochMilli, maxPollDuration);
    long collectedRecords = 0L;

    List<SourceRecord> records = new ArrayList<>();
    while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
        Exchange exchange = consumer.receive(remaining);
        if (exchange == null) {
            // Nothing received, abort and return what we received so far
            break;
        }

        LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
                exchange.getMessage().getMessageId(), exchange.getFromEndpoint());

        // TODO: see if there is a better way to use sourcePartition and sourceOffset
        Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
        Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());

        final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;

        // Retrieve the message body
        Object messageBodyValue = exchange.getMessage().getBody();

        // Declare the 'value' variable
        Object value;

        try {
            if (messageBodyValue instanceof InputStream) {
                InputStream is = (InputStream) messageBodyValue;
                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                int nRead;
                byte[] data = new byte[16384]; // Adjust buffer size as needed

                while ((nRead = is.read(data, 0, data.length)) != -1) {
                    buffer.write(data, 0, nRead);
                }

                buffer.flush();
                byte[] bytes = buffer.toByteArray();

                // Decide whether to use bytes or convert to String based on your data
                //value = bytes; // Use bytes[] for binary data
                value = new String(bytes, StandardCharsets.UTF_8); // Use String for text data
            } else if (messageBodyValue instanceof StreamCache) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ((StreamCache) messageBodyValue).writeTo(baos);
                byte[] bytes = baos.toByteArray();

                // Decide whether to use bytes or convert to String based on your data
                //value = bytes; // Use bytes[] for binary data
                value = new String(bytes, StandardCharsets.UTF_8); // Use String for text data
            } else {
                value = messageBodyValue;
            }
        } catch (IOException e) {
            LOG.error("Error reading message body", e);
            value = null;
        }

        final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
        final Schema messageBodySchema = value != null ? SchemaHelper.buildSchemaBuilderForType(value) : null;

        final long timestamp = calculateTimestamp(exchange);

        // Take into account cached Camel streams
        if (value instanceof StreamCache) {
            StreamCache sc = (StreamCache) value;
            // Reset to ensure the cache is ready before sending it in the record (useful for SMTs)
            sc.reset();
        }

        for (String singleTopic : topics) {
            CamelSourceRecord camelRecord = new CamelSourceRecord(
                sourcePartition,
                sourceOffset,
                singleTopic,
                null,
                messageKeySchema,
                messageHeaderKey,
                messageBodySchema,
                value, // Use 'value' here
                timestamp
            );

            if (mapHeaders) {
                if (exchange.getMessage().hasHeaders()) {
                    setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                }
            }

            if (mapProperties) {
                if (exchange.hasProperties()) {
                    setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                }
            }

            TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
            Integer claimCheck = freeSlots.remove();
            camelRecord.setClaimCheck(claimCheck);
            exchangesWaitingForAck[claimCheck] = exchange;
            LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}",
                camelRecord, exchange, claimCheck);
            records.add(camelRecord);
        }
        collectedRecords++;
        remaining = remaining(startPollEpochMilli, maxPollDuration);
    }

    return records.isEmpty() ? null : records;
}

I added these imports to the file:

import java.io.InputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

These changes allowed the source connector to produce the actual records on the topic, rather than the InputStreamCache object.

Hopefully somebody more familiar with the code base can take this and provide a better solution.