BKBASE-Plugin / flink-cdc-connectors

CDC Connectors for Apache Flink®
https://ververica.github.io/flink-cdc-connectors/
Apache License 2.0
0 stars 0 forks source link

allow mongodb document field defined as string type in flink #1

Closed vanliu-tx closed 2 years ago

vanliu-tx commented 2 years ago

Is your feature request related to a problem? Please describe. Sometimes it's unable to define the mongodb collection schema as a table, the schema changes. We want some nested document to be a string for later parse and calculate, but now you need to define the exactly field type in flink.

Describe the solution you'd like Allow mapping a nested JSON document to flink string type.

2022-07-15 17:50:37,770 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: mongodb source dynamic.dynamic.Node -> Map -> IcebergStreamWriter (1/1)#3 (36334134ff691818fe716011e049fe09) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Unable to convert to string from unexpected value '{"metadata": {"creationTimestamp": "2022-03-17T08:31:27Z", "labels": {"beta.kubernetes.io/os": "linux", "failure-domain.beta.kubernetes.io/region": "nj"}}}' of type DOCUMENT
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.convertToString(MongoDBConnectorDeserializationSchema.java:629)
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.lambda$wrapIntoNullableConverter$81b05d5e$1(MongoDBConnectorDeserializationSchema.java:764)
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.convertField(MongoDBConnectorDeserializationSchema.java:751)
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.lambda$createRowConverter$8ca31a0b$1(MongoDBConnectorDeserializationSchema.java:688)
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.lambda$wrapIntoNullableConverter$81b05d5e$1(MongoDBConnectorDeserializationSchema.java:764)
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.extractRowData(MongoDBConnectorDeserializationSchema.java:167)
        at com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema.deserialize(MongoDBConnectorDeserializationSchema.java:131)
        at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:229)
        at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:151)
        at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:446)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)