Tencent / TBase

TBase is an enterprise-level distributed HTAP database. Through a single database cluster to provide users with highly consistent distributed database services and high-performance data warehouse services, a set of integrated enterprise-level solutions is formed.
Other
1.38k stars 262 forks source link

Flink以tbase的dn节点作为源数据,以pgoutput模式读取日志数据报错 #88

Open zhuqiuxia opened 3 years ago

zhuqiuxia commented 3 years ago

flink项目使用插件flink-connector-postgres-cdc,tbase配置参数:shared_preload_libraries = 'pgoutput' flink项目正常读取到tbase表的存量数据,但是报错: [debezium-postgresconnector-postgres_binlog_source-change-event-source-coordinator] DEBUG io.debezium.relational.TableSchemaBuilder - Mapped columns for table 'public.test' to schema: {"name" : "postgres_binlog_source.public.test.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT64", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "true"}}]} 19:44:42.724 [debezium-postgresconnector-postgres_binlog_source-change-event-source-coordinator] ERROR io.debezium.pipeline.ErrorHandler - Producer failure org.postgresql.util.PSQLException: ERROR: no data left in message at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2532) at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1200) at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1100) at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:42) at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:158) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:123) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:80) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:465) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:125) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19:44:42.725 [debezium-postgresconnector-postgres_binlog_source-change-event-source-coordinator] DEBUG io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - stopping streaming... 19:44:42.725 [debezium-postgresconnector-postgres_binlog_source-change-event-source-coordinator] DEBUG io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Closing replication connection 19:44:42.725 [debezium-postgresconnector-postgres_binlog_source-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Finished streaming 19:44:42.725 [debezium-postgresconnector-postgres_binlog_source-change-event-source-coordinator] INFO io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics - Connected metrics set to 'false' 19:44:42.933 [debezium-engine] DEBUG io.debezium.connector.base.ChangeEventQueue - checking for more records...

JennyJennyChen commented 3 years ago

1、tbase的用户数据存储在DN节点,应该连接DN节点去读数据 2、xlog日志应该配置为logical级别

zhuqiuxia commented 3 years ago

是的,你说的这两点都考虑到了,就是这么配的,现在已经读取到数据,但是报错

JennyJennyChen commented 3 years ago

能读取到数据就说明不是tbase的问题,估计是插件接口的问题

zhuqiuxia commented 3 years ago

这个错是tabse代码里面有的一个错,先用postgres10尝试过没问题,目前的tabse内置的也是10版本。如果是插件问题的话,问一下你们内部是怎么读取tabse日志文件的,有什么好的方式推荐吗

JennyJennyChen commented 3 years ago

这个错是tabse代码里面有的一个错,先用postgres10尝试过没问题,目前的tabse内置的也是10版本。如果是插件问题的话,问一下你们内部是怎么读取tabse日志文件的,有什么好的方式推荐吗

我们用的是decoderbufs,插件编码用了protobuf,所以加入了pb的依赖库:protobuf-3.6.1 protobuf-c-1.3.2

linzhi123 commented 2 years ago

能提供你的flink代码吗

发自我的iPhone

------------------ 原始邮件 ------------------ 发件人: Evan @.> 发送时间: 2022年7月14日 17:07 收件人: Tencent/TBase @.> 抄送: Subscribed @.***> 主题: 回复:[Tencent/TBase] Flink以tbase的dn节点作为源数据,以pgoutput模式读取日志数据报错 (#88)

这个问题依旧存在(2022-07-14) Caused by: org.postgresql.util.PSQLException: ERROR: no data left in message at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2532) at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1200) at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1100) at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:42) at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:158) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:123) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:80) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:467) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.searchWalPosition(PostgresStreamingChangeEventSource.java:271) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:131) ... 7 more

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

linzhi123 commented 2 years ago

好的,我会抽时间看看的

发自我的iPhone

------------------ 原始邮件 ------------------ 发件人: Evan @.> 发送时间: 2022年7月18日 20:59 收件人: Tencent/TBase @.> 抄送: test @.>, Comment @.> 主题: 回复:[Tencent/TBase] Flink以tbase的dn节点作为源数据,以pgoutput模式读取日志数据报错 (#88)

能提供你的flink代码吗 发自我的iPhone …

老师能帮忙看下吗?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: @.***>