AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Error deserializing Number type column without precision nor scala - Oracle #294

Closed joserivera1990 closed 2 years ago

joserivera1990 commented 2 years ago

Hello everyone,

I'm working in a project with Confluent with the Kafka Connect connector CDC Oracle and The connector serialized with io.confluent.connect.avro.AvroConverter. The problem is that in some tables Oracle there are some fields with Number Type without precision or scale, for this reason the Schema registry is created in this way:

{
  "name": "TEST_NUMBER",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 127,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "127"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

When I use the library abris_2.12 version 5.1.1 with Spark 3.1.2 to deserialize data, the fields with Number type without precision or scale are shown with bytes and no as decimal.

image

¿Do you know if this library supported this feature? ¿Should I configure anything more? ¿Do you know If It's is mind to develop that feature?

Code:

val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(topic)
  .usingSchemaRegistry(propsAvro)

def saveToDBFinal = (df: Dataset[Row], batchId: Long) => {

  df.show(false)

  val df2 = df.select(from_avro(col("value"), abrisConfig) as "data").selectExpr("data.*")
  df2.show(false)
}  

Thanks for your answer

Regards

cerveada commented 2 years ago

Hello,

Could you print d2.schema() and post it here?

What version of org.apache.spark:spark-avro library is on your classpath?

kevinwallimann commented 2 years ago

Hi @joserivera1990 Unfortunately, your avro schema is invalid. Per the avro spec, "Scale must be [...] less than or equal to the precision.", see https://avro.apache.org/docs/current/spec.html#Decimal, which is not the case (127 > 64). What happens is that the logical type (Decimal) is not validated when the schema is parsed and instead falls back to null. So, Spark doesn't see the avro logical type decimal and interprets it as a BinaryType instead of a DecimalType.

To solve the issue, you could

joserivera1990 commented 2 years ago

Hello,

Could you print d2.schema() and post it here?

What version of org.apache.spark:spark-avro library is on your classpath?

Hi @cerveada,

I added the print to d2.schema() StructType(StructField(STFAMPRO,StringType,true), StructField(CHFAMPRO,StringType,true), StructField(TEST_NUMBER,BinaryType,true), StructField(TEST_NUMBER_DECIMAL,BinaryType,true), StructField(table,StringType,true), StructField(SCN_CMD,StringType,true), StructField(OP_TYPE_CMD,StringType,true), StructField(op_ts,StringType,true), StructField(current_ts,StringType,true), StructField(row_id,StringType,true), StructField(username,StringType,true))

Checking the external libraries I have -> org.apache.spark:spark-avro_2.12:2.4.8

Regards.

joserivera1990 commented 2 years ago

Hi @joserivera1990 Unfortunately, your avro schema is invalid. Per the avro spec, "Scale must be [...] less than or equal to the precision.", see https://avro.apache.org/docs/current/spec.html#Decimal, which is not the case (127 > 64). What happens is that the logical type (Decimal) is not validated when the schema is parsed and instead falls back to null. So, Spark doesn't see the avro logical type decimal and interprets it as a BinaryType instead of a DecimalType.

To solve the issue, you could

Hi @kevinwallimann,

I got your points, about changing the schema generation process, this schema is generated by Confluent on the connector CDC Oracle https://docs.confluent.io/kafka-connect-oracle-cdc/current/troubleshooting.html#numeric-data-type-with-no-precision-or-scale-results-in-unreadable-output

I did the test using the function .provideReaderSchema and setting in the schema the values: precicion:38 and scale:10

{\"name\":\"TEST_NUMBER\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":10,\"precision\":38,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"10\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null}

And throw the next error: Decimal precision 128 exceeds max precision 38

Finally, I think that should have some way to get the number with scale of 127 and precicion of 64, I don't know if as a string in place of decimal. Por example, I'm using the connector com.snowflake.kafka.connector.SnowflakeSinkConnector and in the properties in value.converter io.confluent.connect.avro.AvroConverter In the Snowflake DataBase that row is saved in this way:

{
  "STFAMPRO": "AA",
  "TEST_NUMBER": "5.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
  "TEST_NUMBER_DECIMAL": "5.1500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
  "current_ts": "1651059748133",
}

The problem is than I don't know as Snowflake implements this connector com.snowflake.kafka.connector.SnowflakeSinkConnector

Thanks for your time!

kevinwallimann commented 2 years ago

Hi @joserivera1990 I see, the problem now is that your data has decimal precision 128 which is larger than the maximum that Spark supports (38). In this case, Spark uses the BinaryType as a fallback. You could try to convert the BinaryType to a human-readable format after it's already in a Spark Dataframe. I think this problem should be solved outside of Abris. Just for the sake of completeness, there is a way to have your own custom logic to convert from Avro to a Spark Dataframe in Abris, see https://github.com/AbsaOSS/ABRiS#custom-data-conversions. However, it's quite involved and I wouldn't recommend you that approach.

joserivera1990 commented 2 years ago

Hi @joserivera1990 I see, the problem now is that your data has decimal precision 128 which is larger than the maximum that Spark supports (38). In this case, Spark uses the BinaryType as a fallback. You could try to convert the BinaryType to a human-readable format after it's already in a Spark Dataframe. I think this problem should be solved outside of Abris. Just for the sake of completeness, there is a way to have your own custom logic to convert from Avro to a Spark Dataframe in Abris, see https://github.com/AbsaOSS/ABRiS#custom-data-conversions. However, it's quite involved and I wouldn't recommend you that approach.

Hi @kevinwallimann , I will review your advices. Thanks for your time!

joserivera1990 commented 2 years ago

Hi everyone, This issue was solved from Connector CDC Oracle version 2.0.0 adding the property numeric.mapping with value best_fit_or_decimal.

https://docs.confluent.io/kafka-connect-oracle-cdc/current/configuration-properties.html Explication Conector Confluent: "Use best_fit_or_decimal if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead."

In this way when the column in Oracle is Numeric without precision or scale the schema registry added the field as double. The connector just will use decimalType if the number value is major than double type maximun number.

My new schema registry version is:

{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"STFAMPRO","type":["null","string"],"default":null},{"name":"CHFAMPRO","type":["null","string"],"default":null},{"name":"**TEST_NUMBER**","type":["null","**double**"],"default":null},{"name":"**TEST_NUMBER_DECIMAL**","type":["null","**double**"],"default":null},{"name":"table","type":["null","string"],"default":null},{"name":"SCN_CMD","type":["null","string"],"default":null},{"name":"OP_TYPE_CMD","type":["null","string"],"default":null},{"name":"op_ts","type":["null","string"],"default":null},{"name":"current_ts","type":["null","string"],"default":null},{"name":"row_id","type":["null","string"],"default":null},{"name":"username","type":["null","string"],"default":null}]} I will close this issue. Thanks everyone!