apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.42k stars 953 forks source link

[Bug] INSERT to Kafka does not work when Kafka config auto.create.topics.enabled is set to false #740

Closed JingsongLi closed 1 year ago

JingsongLi commented 1 year ago

Search before asking

Paimon version

0.4

Compute Engine

flink

Minimal reproduce step

If I use Kafka as the log system and set auto.create.topics.enabled to false in Kafka INSERTs do not work.

Steps to reproduce:

Start a Kafka broker and set auto.create.topics.enabled to false Issue the following statements

CREATE CATALOG table_store_catalog WITH (
   'type'='table-store',
   'warehouse'=<path to object store>
);

USE CATALOG table_store_catalog;

CREATE TABLE word_count (
      word STRING PRIMARY KEY NOT ENFORCED,
      cnt BIGINT
 ) WITH (
   'log.system' = 'kafka',
   'kafka.bootstrap.servers' = <address to broker>,
   'kafka.topic' = 'test-topic,
   'log.consistency' = 'eventual'
 );

 INSERT INTO word_count VALUES ('foo', 1);

What doesn't meet your expectations?

The task manager logs show:

flink-sandbox-taskmanager-1  | 2023-01-18 12:46:17,085 WARN  org.apache.flink.table.store.shaded.org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-1] Error while fetching metadata with correlation id 544 : {test-topic=UNKNOWN_TOPIC_OR_PARTITION}

The INSERT job on the task manager fails with

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.NullPointerException
    at org.apache.flink.table.store.kafka.KafkaSinkFunction.lambda$open$0(KafkaSinkFunction.java:75)
    at org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:982)
    at org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
    at org.apache.flink.table.store.shaded.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:142)
    at org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:926)
    at org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:101)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:245)
    at org.apache.flink.table.store.connector.sink.StoreWriteOperator.processElement(StoreWriteOperator.java:134)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:750)
    Suppressed: org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Pending record count must be zero at this point: 1
        at org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1428)
        at org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:976)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at org.apache.flink.table.store.connector.sink.StoreWriteOperator.close(StoreWriteOperator.java:166)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916)
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:930)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930)
        ... 3 more
    Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 1
        at org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1111)
        at org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:936)
        ... 15 more

Apparently, the Kafka topic is created when the first record is written to the Kafka topic, although I found code to create a Kafka topic explicitly on table creation: https://github.com/apache/flink-table-store/blob/f201b507fef88501c4beb4c62807bef818e31be5/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java#L123

Topic creation should not rely on enabling auto topic creation in Kafka, because users might opt to disable auto topic creation to prevent unexpected costs when a fully-managed Kafka service is used. For example see the Confluent Cloud documentation: https://docs.confluent.io/cloud/current/clusters/broker-config.html#enable-automatic-topic-creation

IMO, when a table is created, the corresponding Kafka topic should be explicitly created.

Anything else?

No response

Are you willing to submit a PR?

calvinjiang commented 1 year ago

@JingsongLi Could you assign this issue to me?

golden-yang commented 1 year ago

It look like managed table related methods had been Removed in LogStoreTableFactory. commit Do we have other plans?

I have a similar bug issue 815 When there are multiple buckets, the exception caused by the number of kafka partitions being 1.

As zhuangchong said in this issue. Do we need to manage tables in LogStoreTableFactory?

JingsongLi commented 1 year ago

@calvinjiang Feel free to create a PR

calvinjiang commented 1 year ago

@calvinjiang Feel free to create a PR

Ok, I'll fix this issue.