memiiso / debezium-server-bigquery

Replicates any database (CDC events) to Bigquery in real time
Apache License 2.0
14 stars 2 forks source link

Override table naming pattern #201

Open kholisrag opened 1 day ago

kholisrag commented 1 day ago

as per mentioned in #195 :

bigquery table name ends up like this: {topic.prefix}_{source_databasename}{table_name}. dbz could be good value for it.

is it possible to override the table naming pattern? I only want to use {table_name} in the table naming, does that possible @ismailsimsek?

more info on log:

2024-11-13 08:42:45,942 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default', error = 'io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-pk roject-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default': io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
    at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:89)
    at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.uploadDestination(StreamBigqueryChangeConsumer.java:154)
    at io.debezium.server.bigquery.AbstractChangeConsumer.handleBatch(AbstractChangeConsumer.java:126)
    at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
    at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
    at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
    at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
    at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.appendWithUniqueId(SchemaAwareStreamWriter.java:250)
    at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:140)
    at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:65)
    at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:83)
    ... 10 more

2024-11-13 08:42:46,146 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-11-13 08:42:46,149 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2024-11-13 08:42:46,151 INFO  [com.goo.clo.big.sto.v1.ConnectionWorkerPool] (main) During closing of writeStream for projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default with writer id 1d374d44-f9a0-4b05-beae-c74f339e2b51, we decided to close 0 connections, pool size after removal $s
com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
        at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
        at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
        at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
        at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
        at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
        at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
        at io.quarkus.runtime.Application.stop(Application.java:208)
        at io.quarkus.runtime.Application.stop(Application.java:155)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
    at io.grpc.Status.asRuntimeException(Status.java:532)
    ... 14 more
2024-11-13 08:42:46,825 WARN  [io.deb.ser.big.StreamBigqueryChangeConsumer] (main) Exception while closing bigquery stream, destination:postgresql_postgres-staging_bifrost_audit.public.dbtable: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
        at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
        at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
        at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
        at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
        at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
        at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
        at io.quarkus.runtime.Application.stop(Application.java:208)
        at io.quarkus.runtime.Application.stop(Application.java:155)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default

the config:

# The common configuration for the Debezium Server BigQuery
debezium.format.value=json
debezium.format.key=json
debezium.format.schemas.enable=true

quarkus.log.level=INFO
quarkus.log.console.json=false
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,ts_ms
debezium.transforms.unwrap.add.headers=db
debezium.transforms.unwrap.delete.tombstone.handling.mode=rewrite

# The BigQuery related configuration for the Debezium Server BigQuery

debezium.source.offset.storage=io.debezium.server.bigquery.offset.BigqueryOffsetBackingStore
debezium.source.offset.storage.bigquery.table-name=dsbq_offset
debezium.source.offset.flush.interval.ms=10000

debezium.source.database.history=io.debezium.server.bigquery.history.BigquerySchemaHistory
debezium.source.database.history.bigquery.table-name=dsbq_history

debezium.sink.type=bigquerystream
debezium.sink.bigquerystream.project=gcp-project-id
debezium.sink.bigquerystream.location=asia-southeast1
debezium.sink.bigquerystream.dataset=stage_postgresql_dbname_public
debezium.sink.bigquerystream.ignore-unknown-fields=false
debezium.sink.bigquerystream.credentials-file=/app/secrets/sa.json
debezium.sink.bigquerystream.create-if-needed=true
debezium.sink.bigquerystream.partition-field=__ts_ms
debezium.sink.bigquerystream.clustering-field=__source_ts_ms
debezium.sink.bigquerystream.partition-type=DAY

debezium.sink.batch.batch-size-wait=NoBatchSizeWait
debezium.sink.bigquerystream.upsert=true
debezium.sink.bigquerystream.upsert-keep-deletes=false
debezium.sink.bigquerystream.upsert-dedup-column=__source_ts_ms
debezium.sink.bigquerystream.upsert-op-column=__op

# The PostgreSQL related configuration for the Debezium Server BigQuery

debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=postgres-staging
debezium.source.database.port=5432
debezium.source.database.user=dbz_bq_user
debezium.source.database.password=xxxxx
debezium.source.database.dbname=dbname
debezium.source.table.include.list=public.audit_log,public.schema_migrations

debezium.source.plugin.name=pgoutput
debezium.source.publication.name=db_publication
debezium.source.publication.autocreate.mode=disabled
debezium.source.topic.prefix=postgresql_postgres-staging_dbname
debezium.source.slot.name=slot_dbname
debezium.source.slot.drop.on.stop=false
debezium.source.snapshot.mode=never

Thanks in advance

ismailsimsek commented 1 day ago

@kholisrag is this happening with the first run? when table created?

currently there is a BigQuery issue. when connector creates the table first time and tries to write stream into it, it fails with above exception. because Bigquery needs some time to create the default stream for the new table.

however when you have tables already created then this error should not happen. more info: https://github.com/googleapis/java-bigquery/issues/2368 https://github.com/googleapis/google-cloud-go/issues/975#issuecomment-1503689334

kholisrag commented 1 day ago

@kholisrag is this happening with the first run? when table created?

yes it is...

anyway, I already have table like below

2024-11-13_16-10

does that possible to use existing table? audit_log / schema_migrations like above? since old data is on there. @ismailsimsek

ismailsimsek commented 1 day ago

yes, its possible to use existing table. when the table exists the consumer will use it. and if not it will try to create it. it will not remove the data

kholisrag commented 1 day ago

yes, its possible to use existing table. when the table exists the consumer will use it. and if not it will try to create it. it will not remove the data

I mean to change this pattern (#195 )

bigquery table name ends up like this: {topic.prefix}{source_database_name}{table_name}. dbz could be good value for it.

I only need {table_name}, does that possible @ismailsimsek ?

Update:

2024-11-13_16-24_1

or may I know the piece of code that do the pattern?

ismailsimsek commented 1 day ago

this is currently not possible, however could be added as a new feature. this is where table name is defined:

https://github.com/memiiso/debezium-server-bigquery/blob/b869977e544c4299549fb130c0acc1603d398cd8/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java#L209-L215

kholisrag commented 1 day ago

thank you @ismailsimsek

checking the code is that possible to use debezium.sink.batch.destination-regexp and debezium.sink.batch.destination-regexp-replace?

ismailsimsek commented 1 day ago

Right 👍 actually that could be used.

kholisrag commented 1 day ago

can you give me an example? didn't find anywhere in the test code too

kholisrag commented 1 day ago

something like this? @ismailsimsek

debezium.sink.batch.destination-regexp=.*_(.*)_(.*)
debezium.sink.batch.destination-regexp-replace=$2

Update:

both above and below, still not working

debezium.sink.batch.destination-regexp=.*\..*\.(.*)
debezium.sink.batch.destination-regexp-replace=$1

error:

2024-11-13 10:28:38,805 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Provided table is null or empty', error = 'java.lang.IllegalArgumentException: Provided table is null or empty': java.lang.IllegalArgumentException: Provided table is null or empty
ismailsimsek commented 1 day ago

something like following: you could use this tool to develop the regexp https://regex101.com/

# use regexp which matches the prefix.
debezium.sink.batch.destination-regexp=^prefix\.
# use empty string as replacement
debezium.sink.batch.destination-regexp-replace=
kholisrag commented 1 day ago

yeah I'm using https://regex101.com/ too, here https://regex101.com/r/gvzYpZ/2 already tried, but still not working too

Update:

use below config:

debezium.sink.batch.destination-regexp=^dsbq\.public\.
debezium.sink.batch.destination-regexp-replace=

the table naming already replaced, but seem still not working, here is the log

2024-11-13 12:05:36,856 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default', error = 'io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default': io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
    at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:89)
    at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.uploadDestination(StreamBigqueryChangeConsumer.java:154)
    at io.debezium.server.bigquery.AbstractChangeConsumer.handleBatch(AbstractChangeConsumer.java:126)
    at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
    at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
    at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
    at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
    at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.appendWithUniqueId(SchemaAwareStreamWriter.java:250)
    at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:140)
    at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:65)
    at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:83)
    ... 10 more

2024-11-13 12:05:37,057 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-11-13 12:05:37,064 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2024-11-13 12:05:37,067 INFO  [com.goo.clo.big.sto.v1.ConnectionWorkerPool] (main) During closing of writeStream for projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default with writer id fa54a2bc-5081-40d1-8500-7a3173d99bad, we decided to close 0 connections, pool size after removal $s
com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
        at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
        at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
        at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
        at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
        at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
        at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
        at io.quarkus.runtime.Application.stop(Application.java:208)
        at io.quarkus.runtime.Application.stop(Application.java:155)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
    at io.grpc.Status.asRuntimeException(Status.java:532)
    ... 14 more
2024-11-13 12:05:37,459 WARN  [io.deb.ser.big.StreamBigqueryChangeConsumer] (main) Exception while closing bigquery stream, destination:dsbq.public.audit_log: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
        at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
        at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
        at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
        at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
        at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
        at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
        at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
        at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
        at io.quarkus.runtime.Application.stop(Application.java:208)
        at io.quarkus.runtime.Application.stop(Application.java:155)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
    at io.grpc.Status.asRuntimeException(Status.java:532)
    ... 14 more
kholisrag commented 1 day ago

hmhmh when I try to use bigquerybatch mode, got the following error:

{
  "error": {
    "code": 400,
    "message": "Provided Schema does not match Table gcp-project-id:stage_postgresql_bifrost_audit_public.audit_log. Field audit_time has changed type from TIMESTAMP to INTEGER",
    "errors": [
      {
        "message": "Provided Schema does not match Table gcp-project-id:stage_postgresql_bifrost_audit_public.audit_log. Field audit_time has changed type from TIMESTAMP to INTEGER",
        "domain": "global",
        "reason": "invalid"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}
ismailsimsek commented 1 day ago

@kholisrag it seems like you are switching between stream and batch consumers and both are using different schemas when creating the table. if you delete the current table and let the consumer create it this error should be gone. simply audit_time field type is mismatching.

ismailsimsek commented 1 day ago

but as you can see the table name is correctly set. without prefix

laku6-145607:stage_postgresql_bifrost_audit_public.audit_log

kholisrag commented 1 day ago

Okay, it seems like a mismatch schema. Checking the original table in database VM, its timestamp, not sure why its converted to an integer by debezium-server.

2024-11-13_21-29

ismailsimsek commented 1 day ago

@kholisrag Bigquery does some level of automatic data type recognition (converting long to timestap ..etc). And this is especially different for time,timestamp types between the batch and streaming consumers. i believe this is where its coming from.

i believe when you just use one consumer this should not happen.

kholisrag commented 1 day ago

for context, we want to migrate from datastream to self-managed using this tools, without changing the existing, only want to update it. but seem like this become a problem ya...

i believe when you just use one consumer, this should not happen.

hmhmh, I try bigquerybatch vs bigquerystream in different time. seem become problem because I migrate from datastream.

anyway thank you!

ismailsimsek commented 1 day ago

@kholisrag is there specific reason switching from data stream? I believe Data type mapping could easily be improved (especially for bigquerystream)

following config could be used to fix type mapping. (This part is still in beta) https://github.com/memiiso/debezium-server-bigquery/blob/b869977e544c4299549fb130c0acc1603d398cd8/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/converters/ChangeConsumerConverterTest.java#L96-L98

data type mapping code section is below. Type conversions and corrections could be handled here: https://github.com/memiiso/debezium-server-bigquery/blob/b869977e544c4299549fb130c0acc1603d398cd8/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java#L186-L255

Additionally i suggest checking debezium config to change data type values

its also related to https://github.com/googleapis/java-bigquerystorage/issues/1764 and https://github.com/googleapis/java-bigquerystorage/issues/1765

kholisrag commented 1 day ago

let me test to use:

debezium.source.converters=bqdatetime
debezium.source.bqdatetime.type=io.debezium.server.converters.TemporalToISOStringConverter

@kholisrag is there specific reason switching from data stream? I believe Data type mapping could easily be improved (especially for bigquerystream)

cost issue @ismailsimsek

ismailsimsek commented 1 day ago

i see, if you don't need real-time data. then bigquerybatch is using free api. it should not create any cost. regarding data type conversion i will add sample test, and document it soon.

kholisrag commented 1 day ago

ah I mean this GCP Datastream https://cloud.google.com/datastream/docs/overview not the bigquery storage write streaming api.

so we want to move from GCP Datastream to self-manage @ismailsimsek , we're exploring the tools.


btw, still have same issue, probably the type conversion issue, that yeah, we need to update the code to cover it. below is the config.

debezium.sink.batch.destination-regexp=^dsbq\.public\.
debezium.sink.batch.destination-regexp-replace=
debezium.source.converters=bqdatetime
debezium.source.bqdatetime.type=io.debezium.server.converters.TemporalToISOStringConverter

log:

INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default', error = 'io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default': io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default