apache / camel-kafka-connector-examples

Apache Camel Kafka Connector Examples
https://camel.apache.org
64 stars 43 forks source link

HDFS connector examples #320

Closed imtiny closed 3 years ago

imtiny commented 3 years ago

HI , Can you please add some examples for HDFS sink/source connector? Thanks.

oscerd commented 3 years ago

For the moment you can have a look at the integration test here: https://github.com/apache/camel-kafka-connector/tree/main/tests/itests-hdfs

imtiny commented 3 years ago

For the moment you can have a look at the integration test here: https://github.com/apache/camel-kafka-connector/tree/main/tests/itests-hdfs

Thanks for you quick response!

Previously I set the taskMax=3, and with value.converter=null and key.converter=null, the error is org.apache.camel.RuntimeCamelException: java.lang.NullPointerException

Then set the two conveters to org.apache.kafka.connect.storage.StringConverter, the error disapears.

But I notice that only one message saved into hdfs (there are 25 messages...), there is no any error messages in kafka-connect connector logs and hdfs logs.

oscerd commented 3 years ago

Can you please post your full configuration?

imtiny commented 3 years ago

OK, thank you !

EnrichedConnectorConfig:

        config.action.reload = restart
    connector.class = org.apache.camel.kafkaconnector.hdfs.CamelHdfsSinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name =
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = true
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.storage.StringConverter
    name = hdfs-sink-connector
    predicates = []
    tasks.max = 1
    topics = [user-activity]
    topics.regex =
    transforms = []
    value.converter = class org.apache.kafka.connect.storage.StringConverter

consumerConfigs:

       allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = connector-consumer-hdfs-sink-connector-0
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = connect-hdfs-sink-connector
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

CamelHdfsSinkConnectorConfig:

        camel.aggregation.size = 10
    camel.aggregation.timeout = 500
    camel.beans.aggregate = null
    camel.component.hdfs.autowiredEnabled = true
    camel.component.hdfs.jAASConfiguration = null
    camel.component.hdfs.kerberosConfigFile = null
    camel.component.hdfs.lazyStartProducer = false
    camel.error.handler = default
    camel.error.handler.max.redeliveries = 0
    camel.error.handler.redelivery.delay = 1000
    camel.idempotency.enabled = false
    camel.idempotency.expression.header = null
    camel.idempotency.expression.type = body
    camel.idempotency.kafka.bootstrap.servers = localhost:9092
    camel.idempotency.kafka.max.cache.size = 1000
    camel.idempotency.kafka.poll.duration.ms = 100
    camel.idempotency.kafka.topic = kafka_idempotent_repository
    camel.idempotency.memory.dimension = 100
    camel.idempotency.repository.type = memory
    camel.map.headers = true
    camel.map.properties = true
    camel.remove.headers.pattern =
    camel.sink.component = hdfs
    camel.sink.contentLogLevel = ON
    camel.sink.endpoint.append = false
    camel.sink.endpoint.blockSize = 67108864
    camel.sink.endpoint.bufferSize = 4096
    camel.sink.endpoint.checkIdleInterval = 500
    camel.sink.endpoint.chunkSize = 4096
    camel.sink.endpoint.compressionCodec = DEFAULT
    camel.sink.endpoint.compressionType = NONE
    camel.sink.endpoint.connectOnStartup = true
    camel.sink.endpoint.fileSystemType = HDFS
    camel.sink.endpoint.fileType = NORMAL_FILE
    camel.sink.endpoint.kerberosConfigFileLocation = null
    camel.sink.endpoint.kerberosKeytabLocation = null
    camel.sink.endpoint.kerberosUsername = null
    camel.sink.endpoint.keyType = NULL
    camel.sink.endpoint.lazyStartProducer = false
    camel.sink.endpoint.namedNodes = mars-hdfs-namenode:8020
    camel.sink.endpoint.openedSuffix = opened
    camel.sink.endpoint.overwrite = true
    camel.sink.endpoint.owner = hdfs
    camel.sink.endpoint.readSuffix = read
    camel.sink.endpoint.replication = 3
    camel.sink.endpoint.splitStrategy = null
    camel.sink.endpoint.valueType = BYTES
    camel.sink.marshal = null
    camel.sink.path.hostName = mars-hdfs
    camel.sink.path.path = user/kafka/test1
    camel.sink.path.port = 8020
    camel.sink.unmarshal = null
    camel.sink.url = null
oscerd commented 3 years ago

From what I see in your sink configuration you're using the in-memory idempotency. It may be possible you're consuming from the kafka topic and the message are all the same? As default the idempotency will check the message body, so if you consume just the first one and nothing happen after that, it may be that the message are all the same.

imtiny commented 3 years ago

I guess idempotency is disabled by camel.idempotency.enabled = false ?

the messages is something like:

// first message
{
   a: 1,
   b: 'some string'
}

// second message 
{
   a: -1,
   b: 'some string'
}

// 3rd 
{
   a: 1,
   b: 'some string'
}

// 4th
{
   a: -1,
   b: 'some string'
}

// 5th
{
   a: 1,
   b: 'some string1'
}

...
oscerd commented 3 years ago

I missed the option while reading. Can you provide a reproducer for this? As Github project or something? I need to try it when I have time.

imtiny commented 3 years ago

Thank you so much!

I use Gradiant chart for hdfs deployment and strimzi for the kafka and connectors deployment.

of course the connector is CamelHdfsSinkConnector

the messages send to kafka is some event log, for example, users continously click like > unlike > like > unlike ...

oscerd commented 3 years ago

I'll try to check, not sure about timing btw.

imtiny commented 3 years ago

Hi @oscerd , I enabled the trace log and found something

TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Polling for fetches with timeout 2147441133 (org.apache.kafka.clients.consumer.KafkaConsumer) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]

and

2021-05-29 09:26:26,211 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-4 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,211 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-6 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,211 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-0 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,211 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-2 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-13 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-15 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-9 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-11 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-21 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-23 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-17 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-19 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-24 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-5 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-7 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-1 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-3 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-12 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-14 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-8 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-10 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-20 because previous request to my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-22 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-16 because previous request to my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 1 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Skipping fetch for partition hdfs-connect-cluster-offsets-18 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
2021-05-29 09:26:26,212 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-1, groupId=hdfs-connect-cluster] Polling for fetches with timeout 2147440212 (org.apache.kafka.clients.consumer.KafkaConsumer) [KafkaBasedLog Work Thread - hdfs-connect-cluster-offsets]
imtiny commented 3 years ago

I seems that get kafka connect status timeout, then skipped fetching connect cluster offsets, so no more data save to hdfs.

imtiny commented 3 years ago

HI @oscerd more logs

2021-05-29 09:26:26,359 DEBUG [Consumer clientId=connector-consumer-hdfs-sink-connector-0, groupId=connect-hdfs-sink-connector] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(user-activity-0)) to broker my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) [task-thread-hdfs-sink-connector-0]
2021-05-29 09:26:26,359 TRACE [Consumer clientId=connector-consumer-hdfs-sink-connector-0, groupId=connect-hdfs-sink-connector] Polling for fetches with timeout 275 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-hdfs-sink-connector-0]
2021-05-29 09:26:26,360 DEBUG [Consumer clientId=connector-consumer-hdfs-sink-connector-0, groupId=connect-hdfs-sink-connector] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=connector-consumer-hdfs-sink-connector-0, correlationId=91) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=135064906, sessionEpoch=71, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient) [task-thread-hdfs-sink-connector-0]
2021-05-29 09:26:26,360 TRACE [Consumer clientId=connector-consumer-hdfs-sink-connector-0, groupId=connect-hdfs-sink-connector] Skipping fetch for partition user-activity-0 because previous request to my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 2 rack: null) has not been processed (org.apache.kafka.clients.consumer.internals.Fetcher) [task-thread-hdfs-sink-connector-0]
2021-05-29 09:26:26,360 TRACE [Consumer clientId=connector-consumer-hdfs-sink-connector-0, groupId=connect-hdfs-sink-connector] Polling for fetches with timeout 274 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-hdfs-sink-connector-0]
2021-05-29 09:26:26,452 DEBUG [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-hdfs-connect-cluster-3, correlationId=93): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=244884353, responses=[]) (org.apache.kafka.clients.NetworkClient) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
2021-05-29 09:26:26,452 DEBUG [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 244884353 with response=(), implied=(hdfs-connect-cluster-configs-0) (org.apache.kafka.clients.FetchSessionHandler) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
2021-05-29 09:26:26,452 DEBUG [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Added READ_UNCOMMITTED fetch request for partition hdfs-connect-cluster-configs-0 at position FetchPosition{offset=94, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
2021-05-29 09:26:26,452 DEBUG [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Built incremental fetch (sessionId=244884353, epoch=87) for node 0. Added (), altered (), removed () out of (hdfs-connect-cluster-configs-0) (org.apache.kafka.clients.FetchSessionHandler) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
2021-05-29 09:26:26,452 DEBUG [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(hdfs-connect-cluster-configs-0)) to broker my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc.cluster.local:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
2021-05-29 09:26:26,452 TRACE [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Polling for fetches with timeout 2147443924 (org.apache.kafka.clients.consumer.KafkaConsumer) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
2021-05-29 09:26:26,453 DEBUG [Consumer clientId=consumer-hdfs-connect-cluster-3, groupId=hdfs-connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-hdfs-connect-cluster-3, correlationId=94) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=244884353, sessionEpoch=87, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient) [KafkaBasedLog Work Thread - hdfs-connect-cluster-configs]
oscerd commented 3 years ago

This seems something related to your running cluster configuration and connector. I'll try to have a look, but not sure when.

imtiny commented 3 years ago

@oscerd Thank you! I will try to find out, too.

imtiny commented 3 years ago

Hi @oscerd

I have the following configs for camel hdfs connector

    camel.sink.endpoint.fileType: NORMAL_FILE
    camel.sink.endpoint.namedNodes: mars-hdfs-namenode:8020
    camel.sink.endpoint.owner: hdfs
    camel.sink.endpoint.valueType: BYTES
    camel.sink.path.hostName: mars-hdfs
    camel.sink.path.path: user/kafka/test5

When I use kafka console producer send messages to kafka, I noticed that there are two files(/user/kafka/test5.opened and /user/kafka/test5/test5.opened) created by connector.

And the file /user/kafka/test5.opened always keep one line of message, which is the newest one, while the file /user/k afka/test5/test.opened always keep the first message.

I can confirm that if I send a new message, it will be saved to the file /user/kafka/test5.opened, so the connector can fetch and save messages from kafka.

But where is the other messages? can you please give some explanations or config suggestions ?

Thank you!

oscerd commented 3 years ago

Maybe it's related to: camel.sink.endpoint.overwrite, which is true by default.

You can set camel.sink.endpoint.append to true and camel.sink.endpoint.overwrite to false and see.

imtiny commented 3 years ago

Thank you ! It works if I already have the file /user/kafka/test5.opened created by user kafka.

If I use command line create a new directory user/kafka/test6 for new hdfs connector by user hdfs, then create a new hdfs connector with the settings

camel.sink.endpoint.overwrite=false
camel.sink.endpoint.append=true

And set the camel.sink.path.path=user/kafka/test6

Then run kubect apply ... to start the new connector

It will move the /user/kafka/test6(owner hdfs) to /user/kafka/test6.opened(owner hdfs) And I can see the error log shows that /user/kafka/test6.opened already exists, failed to sink message.

So I have to set camel.sink.endpoint.append=false first, after the file /user/kafka/test6.opened created by kafka, change the config camel.sink.endpoint.append=true.