apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.75k stars 1.95k forks source link

AWS Kinesis Data Analytics with Aurora Postgres and Flink CDC could not access file "decoderbufs" #1874

Closed soumilshah1995 closed 9 months ago

soumilshah1995 commented 1 year ago

I have a table called sales

      CREATE TABLE sales (
                InvoiceID int NOT NULL ,
                ItemID int NOT NULL,
                Category varchar(255),
                Price decimal ,
                Quantity int not NULL,
                OrderDate timestamp,
                DestinationState varchar(2),
                ShippingType varchar(255),
                Referral varchar(255),
                PRIMARY KEY (InvoiceID)
            )

i am able to connect and run query via PG Admin image

Added the JAR files on KDA

image

trying out CDC connector in Zeppelin

image

Started the JOB

image

Error

java.io.IOException: Fail to run stream sql job
    at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
    at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:503)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:266)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 00b4a268bc95c72b65610878bf2d4169)
    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:125)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    ... 3 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:123)
    ... 23 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:222)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:718)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: io.debezium.DebeziumException: Creation of replication slot failed
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:141)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:759)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2565)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2297)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:279)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:363)
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:134)
    ... 6 more
soumilshah1995 commented 1 year ago

any update

menghe999 commented 1 year ago

Hi @soumilshah1995 , I use postgresql version 14 envirment met the same error(I found flink-cdc-connector 2.4 version support to postgresql vesion 12). what is your postgresql version,now solved?

menghe999 commented 1 year ago

i solve the error,by config the property decoding.plugin.name, as follow

CREATE TABLE test_s1 (
  id INT,
  name STRING
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'xxx',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'xxx',
  'table-name' = 'xx',
  'slot.name' = 'xxxx',
  'decoding.plugin.name' = 'pgoutput',
   -- experimental feature: incremental snapshot (default off)
  'scan.incremental.snapshot.enabled' = 'true'
)

'slot.name' is necessary, and 'decoding.plugin.name' depends your postgres service config

soumilshah1995 commented 1 year ago

i can give a try and let you know

soumilshah1995 commented 1 year ago

Here are Steps

Step 1 : Created DB

image

Download JAR and Inserted on S3

https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4.0/flink-sql-connector-postgres-cdc-2.4.0.jar

KDA

image

inserted sample data into Aurora

image

CREATE TABLE IF NOT EXISTS public.sales
(
    salesid SERIAL PRIMARY KEY,
    invoiceid integer,
    itemid integer,
    category text COLLATE pg_catalog."default",
    price numeric(10,2),
    quantity integer,
    orderdate date,
    destinationstate text COLLATE pg_catalog."default",
    shippingtype text COLLATE pg_catalog."default",
    referral text COLLATE pg_catalog."default",
    updated_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO public.sales (invoiceid, itemid, category, price, quantity, orderdate, destinationstate, shippingtype, referral)
VALUES (12345, 56789, 'Electronics', 99.99, 2, '2023-07-04', 'California', 'Standard', 'Google Search');

INSERT INTO public.sales (invoiceid, itemid, category, price, quantity, orderdate, destinationstate, shippingtype, referral)
VALUES (1002, 2002, 'Home Decor', 79.99, 1, '2023-06-20', 'Texas', 'Standard', 'Facebook Ad');

kda

image

image

looks like it gets stuck on pending state i dont see tables in glue catlog

@menghe999 menghe999

org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: Fail to open FlinkInterpreter
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:844)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: Fail to open FlinkInterpreter
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
    at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:322)
    at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:333)
    at org.apache.zeppelin.flink.FlinkSqlInterpreter.open(FlinkSqlInterpreter.java:48)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:41)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
    ... 8 more
Caused by: org.apache.zeppelin.interpreter.InterpreterException: Fail to open FlinkInterpreter
    at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:75)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
    ... 13 more
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to determine whether database pgdb exists or not
    at org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:455)
    at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:336)
    at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:211)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:371)
    at org.apache.zeppelin.flink.FlinkScalaInterpreter.registerHiveCatalog(FlinkScalaInterpreter.scala:509)
    at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:141)
    at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:70)
    ... 14 more
Caused by: MetaException(message:Unable to get database object: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to glue.us-west-2.amazonaws.com:443 [glue.us-west-2.amazonaws.com/44.238.97.27, glue.us-west-2.amazonaws.com/50.112.112.45, glue.us-west-2.amazonaws.com/52.27.177.24, glue.us-west-2.amazonaws.com/54.245.186.171, glue.us-west-2.amazonaws.com/15.254.35.55, glue.us-west-2.amazonaws.com/44.232.7.23, glue.us-west-2.amazonaws.com/44.234.249.190, glue.us-west-2.amazonaws.com/52.12.9.39] failed: connect timed out)
    at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.getDatabase(GlueMetastoreClientDelegate.java:270)
    at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.getDatabase(AWSCatalogMetastoreClient.java:298)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2327)
    at com.sun.proxy.$Proxy43.getDatabase(Unknown Source)
    at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.getDatabase(HiveMetastoreClientWrapper.java:130)
    at org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:450)
    ... 20 more
loserwang1024 commented 9 months ago

Closing this issue as it is no longer valid or requires additional information. If the issue still exists, please report it on Apache Jira under the project Flink, using the component tag Flink CDC. Thank you!