Open syalinbas opened 4 years ago
I also have a similar problem . I have a CSAS stream that converts from json topic (source) to Avro topic (dest) . Sometimes the messages coming to Avro dest topic will get stopped however I can see messages are coming in normally to source json topic. Initially I used to recreate the stream however I found that I just have to restart ksql container , and after that messages will start appearing in Avro topic. Anyone faced something similar before or have any idea whats happening?
We have a ksqldb server deployment that stops working after couple of days. Specifically we find a few processors stopped and having their state transition from RUNNING into ERROR without apparent error message in the logs, or we fail to find one as we do not see any FATAL error.
You can find logs here: https://storage.googleapis.com/dfw-logs/all.log.zip https://storage.googleapis.com/dfw-logs/ksqldb.log.zip
We remedy the situation removing and adding processors (DROP/CREATE STREAM/TABLE statements) on the KSQLDB CLI. Processing starts as soon as we create those STREAM/TABLE statements and continues to work for couple of days until we find them stopped again.
Following are error messages we see frequently on the logs and no:
On ksqldb logs:
Here's the composition of the server:
There are 3 brokers, 1 ksqldb-server:
broker: image: confluentinc/cp-enterprise-kafka:5.4.0 environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181,zookeeper2:2182,zookeeper3:2183' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_EXT:PLAINTEXT KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_LOG_RETENTION_HOURS: 504 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100 CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
ksqldb-server: image: confluentinc/ksqldb-server:0.8.1 environment: KSQL_KSQL_SERVICE_ID: ${KSQL_SERVICE_ID} KSQL_BOOTSTRAP_SERVERS: "broker:29092,broker2:29093,broker3:29094" KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' KSQL_KSQL_CONNECT_URL: connect:8083 KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: 3 KSQL_KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS: 2 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_PARTITIONS: 3 KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 3 KSQL_KSQL_STREAMS_PRODUCER_DELIVERY_TIMEOUT_MS: 2147483647 KSQL_KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807 KSQL_KSQL_STREAMS_REPLICATION_FACTOR: 3 KSQL_KSQL_STREAMS_PRODUCER_ACKS: all KSQL_KSQL_STREAMS_TOPIC_MIN_INSYNC_REPLICAS: 2 KSQL_KSQL_STREAMS_STATE_DIR: /data/ksqldb KSQL_KSQL_STREAMS_NUM_STANDBY_REPLICAS: 3
A few stack trace: ksqldb-server3 | org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {_confluent-ksql-dev-01query_CSAS_locs_33-KSTREAM-JOINTHIS-0000000013-store-changelog-1=3861} ksqldb-server3 | at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1262) ksqldb-server3 | at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:607) ksqldb-server3 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315) ksqldb-server3 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ksqldb-server3 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ksqldb-server3 | at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1050) ksqldb-server3 | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) ksqldb-server3 | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ksqldb-server3 | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
ksqldb-server | org.apache.kafka.streams.errors.TaskMigratedException: Client request for task 0_0 has been fenced due to a rebalance ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:530) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478) ksqldb-server | at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) ksqldb-server | at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ksqldb-server | Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1099) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:966) ksqldb-server | at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1511) ksqldb-server | at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1459) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:527) ksqldb-server | ... 7 more
ksqldb-server | Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1196) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1123) ksqldb-server | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1078) ksqldb-server | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1058) ksqldb-server | at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ksqldb-server | at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ksqldb-server | at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ksqldb-server | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:967) ksqldb-server | at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1511) ksqldb-server | at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1459) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:527) ksqldb-server | ... 7 more ksqldb-server | [2020-04-20 00:36:12,206] INFO Received: KsqlRequest{ksql='describe extended mwe_resource_locate_raw;describe extended "zle_locate";describe extended zle_locate_join;describe extended locate