apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.52k stars 1.29k forks source link

Convert BYTES DataType to DECIMAL in Apache Pinot #14510

Open rajat-sr1704 opened 1 day ago

rajat-sr1704 commented 1 day ago

I am working on ingesting realtime data from apache kafka topic. The data is stored in Avro format with Schema Registry, I want to ingest the real time data into apache pinot. As the data in Avro is in nested format I am using JSONPATH(query, 'query_field') to extract data from the nested data. All the data are coming correctly but cost, tax, cod_charges, entry_tax these fields are stored in avro topic in BYTES data type, the actual data type for these fields are integer in the original dataset and it got converted into bytes while ingesting into kafka topic as avro format. I want to convert it again into decimal or integer format while ingesting into apache pinot, there's no function for extracting bytes data type from the nested data topic.

my ingestionConfig looks like:

"ingestionConfig": { "transformConfigs": [ { "columnName": "id", "transformFunction": "JSONPATHLONG(after, '$.id')" }, { "columnName": "order_id", "transformFunction": "JSONPATHLONG(after, '$.order_id')" }, { "columnName": "company_id", "transformFunction": "JSONPATHLONG(after, '$.company_id')" }, { "columnName": "channel_id", "transformFunction": "JSONPATHLONG(after, '$.channel_id')" }, { "columnName": "invoice_no", "transformFunction": "JSONPATHSTRING(after, '$.invoice_no')" }, { "columnName": "encrypt_invoice_name", "transformFunction": "JSONPATHSTRING(after, '$.encrypt_invoice_name')" }, { "columnName": "courier", "transformFunction": "JSONPATHSTRING(after, '$.courier')" }, { "columnName": "sr_courier_id", "transformFunction": "JSONPATHLONG(after, '$.sr_courier_id')" }, { "columnName": "code", "transformFunction": "JSONPATHSTRING(after, '$.code')" }, { "columnName": "awb", "transformFunction": "JSONPATHSTRING(after, '$.awb')" }, { "columnName": "pickup_token_number", "transformFunction": "JSONPATHSTRING(after, '$.pickup_token_number')" }, { "columnName": "dhl_handover_id", "transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_id')" }, { "columnName": "dhl_handover_url", "transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_url')" }, { "columnName": "pickup_address_id", "transformFunction": "JSONPATHLONG(after, '$.pickup_address_id')" }, { "columnName": "pickup_reshedule_count", "transformFunction": "JSONPATHDOUBLE(after, '$.pickup_reshedule_count')" }, { "columnName": "return_pickup_address_id", "transformFunction": "JSONPATHDOUBLE(after, '$.return_pickup_address_id')" }, { "columnName": "dhl_pickup_url", "transformFunction": "JSONPATHSTRING(after, '$.dhl_pickup_url')" }, { "columnName": "method", "transformFunction": "JSONPATHSTRING(after, '$.method')" }, { "columnName": "channel_shipment_id", "transformFunction": "JSONPATHSTRING(after, '$.channel_shipment_id')" }, { "columnName": "weight", "transformFunction": "JSONPATHSTRING(after, '$.weight')" }, { "columnName": "dimensions", "transformFunction": "JSONPATHSTRING(after, '$.dimensions')" }, { "columnName": "volumetric_weight", "transformFunction": "JSONPATHSTRING(after, '$.volumetric_weight')" }, { "columnName": "quantity", "transformFunction": "JSONPATHDOUBLE(after, '$.quantity')" }, { "columnName": "status", "transformFunction": "JSONPATHDOUBLE(after, '$.status')" }, { "columnName": "state_type", "transformFunction": "JSONPATHDOUBLE(after, '$.state_type')" }, { "columnName": "sub_status", "transformFunction": "JSONPATHDOUBLE(after, '$.sub_status')" }, { "columnName": "status_code", "transformFunction": "JSONPATHSTRING(after, '$.status_code')" }, { "columnName": "shipment_zone", "transformFunction": "JSONPATHDOUBLE(after, '$.shipment_zone')" }, { "columnName": "label_url", "transformFunction": "JSONPATHSTRING(after, '$.label_url')" }, { "columnName": "manifest_url", "transformFunction": "JSONPATHSTRING(after, '$.manifest_url')" }, { "columnName": "is_locked", "transformFunction": "JSONPATHSTRING(after, '$.is_locked')" }, { "columnName": "customer_gstin", "transformFunction": "JSONPATHSTRING(after, '$.customer_gstin')" }, { "columnName": "eway_bill_number", "transformFunction": "JSONPATHSTRING(after, '$.eway_bill_number')" }, { "columnName": "pod", "transformFunction": "JSONPATHSTRING(after, '$.pod')" }, { "columnName": "frozen_weight", "transformFunction": "JSONPATHDOUBLE(after, '$.frozen_weight')" }, { "columnName": "isd_code", "transformFunction": "JSONPATHSTRING(after, '$.isd_code')" }, { "columnName": "seller_address", "transformFunction": "JSONPATHSTRING(after, '$.seller_address')" }, { "columnName": "shipping_address", "transformFunction": "JSONPATHSTRING(after, '$.shipping_address')" }, { "columnName": "customer_details", "transformFunction": "JSONPATHSTRING(after, '$.customer_details')" }, { "columnName": "comment", "transformFunction": "JSONPATHSTRING(after, '$.comment')" }, { "columnName": "others", "transformFunction": "JSONPATHSTRING(after, '$.others')" }, { "columnName": "entry_tax", "transformFunction": "JSONPATHDOUBLE(after, '$.entry_tax')" }, { "columnName": "cost", "transformFunction": "JSONPATHDOUBLE(after, '$.cost')" }, { "columnName": "tax", "transformFunction": "JSONPATHDOUBLE(after, '$.tax')" }, { "columnName": "cod_charges", "transformFunction": "JSONPATHDOUBLE(after, '$.cod_charges')" }, { "columnName": "total", "transformFunction": "JSONPATHDOUBLE(after, '$.total')" }, { "columnName": "invoice_date", "transformFunction": "JSONPATHLONG(after, '$.invoice_date')" }, { "columnName": "awb_assign_date", "transformFunction": "JSONPATHLONG(after, '$.awb_assign_date')" }, { "columnName": "pickup_generated_date", "transformFunction": "JSONPATHLONG(after, '$.pickup_generated_date')" }, { "columnName": "pickup_scheduled_date", "transformFunction": "JSONPATHLONG(after, '$.pickup_scheduled_date')" }, { "columnName": "out_for_pickup_date", "transformFunction": "JSONPATHSTRING(after, '$.out_for_pickup_date')" }, { "columnName": "created_at", "transformFunction": "JSONPATHSTRING(after, '$.created_at')" }, { "columnName": "updated_at", "transformFunction": "JSONPATHSTRING(after, '$.updated_at')" }, { "columnName": "rto_initiated_date", "transformFunction": "JSONPATHSTRING(after, '$.rto_initiated_date')" }, { "columnName": "rto_delivered_date", "transformFunction": "JSONPATHSTRING(after, '$.rto_delivered_date')" }, { "columnName": "updated_on", "transformFunction": "JSONPATHSTRING(after, '$.updated_on')" }, { "columnName": "etd", "transformFunction": "JSONPATHLONG(after, '$.etd')" }, { "columnName": "promised_pickup_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_pickup_tat')" }, { "columnName": "promised_delivery_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_delivery_tat')" }, { "columnName": "promised_rto_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_rto_tat')" }, { "columnName": "promised_cod_remittance_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_cod_remittance_tat')" }, { "columnName": "shipped_date", "transformFunction": "JSONPATHSTRING(after, '$.shipped_date')" }, { "columnName": "delivered_date", "transformFunction": "JSONPATHSTRING(after, '$.delivered_date')" }, { "columnName": "returned_date", "transformFunction": "JSONPATHSTRING(after, '$.returned_date')" }, { "columnName": "eway_bill_date", "transformFunction": "JSONPATHSTRING(after, '$.returned_date')" }, { "columnName": "mps_data", "transformFunction": "JSONPATHSTRING(after, '$.returned_date')" } ],

and my schemaFile.json:

{ "schemaName": "shipments", "enableColumnBasedNullHandling": true, "dimensionFieldSpecs": [ { "name": "id", "dataType": "LONG" }, { "name": "order_id", "dataType": "LONG" }, { "name": "company_id", "dataType": "LONG" }, { "name": "channel_id", "dataType": "LONG" }, { "name": "invoice_no", "dataType": "STRING" }, { "name": "encrypt_invoice_name", "dataType": "STRING" }, { "name": "courier", "dataType": "STRING" }, { "name": "sr_courier_id", "dataType": "INT" }, { "name": "code", "dataType": "STRING" }, { "name": "awb", "dataType": "STRING" }, { "name": "pickup_token_number", "dataType": "STRING" }, { "name": "pickup_address_id", "dataType": "LONG" }, { "name": "return_pickup_address_id", "dataType": "INT" }, { "name": "dhl_handover_id", "dataType": "STRING" }, { "name": "dhl_handover_url", "dataType": "STRING" }, { "name": "pickup_reshedule_count", "dataType": "INT" }, { "name": "dhl_pickup_url", "dataType": "STRING" }, { "name": "method", "dataType": "STRING" }, { "name": "channel_shipment_id", "dataType": "STRING" }, { "name": "weight", "dataType": "STRING" }, { "name": "dimensions", "dataType": "STRING" }, { "name": "volumetric_weight", "dataType": "STRING" }, { "name": "quantity", "dataType": "INT" }, { "name": "status", "dataType": "INT" }, { "name": "state_type", "dataType": "INT" }, { "name": "sub_status", "dataType": "INT" }, { "name": "status_code", "dataType": "STRING" }, { "name": "shipment_zone", "dataType": "INT" }, { "name": "label_url", "dataType": "STRING" }, { "name": "manifest_url", "dataType": "STRING" }, { "name": "is_locked", "dataType": "INT" }, { "name": "customer_gstin", "dataType": "STRING" }, { "name": "eway_bill_number", "dataType": "STRING" }, { "name": "pod", "dataType": "STRING" }, { "name": "frozen_weight", "dataType": "INT" }, { "name": "isd_code", "dataType": "STRING" }, { "name": "seller_address", "dataType": "STRING" }, { "name": "shipping_address", "dataType": "STRING" }, { "name": "customer_details", "dataType": "STRING" }, { "name": "comment", "dataType": "STRING" }, { "name": "others", "dataType": "STRING" }, { "name": "mps_data", "dataType": "STRING" } ], "metricFieldSpecs": [ { "name": "entry_tax", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "cost", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "tax", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "cod_charges", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "total", "dataType": "DOUBLE", "defaultNullValue": null } ], "dateTimeFieldSpecs": [ { "name": "invoice_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "awb_assign_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "pickup_generated_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "pickup_scheduled_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "out_for_pickup_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "created_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "updated_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "rto_initiated_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "rto_delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "updated_on", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "etd", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "promised_pickup_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "promised_delivery_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "promised_rto_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "promised_cod_remittance_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "shipped_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "returned_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "eway_bill_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" } ] }

Please let me know if you need anything to debug this issue.

I have tried making it into Double type data type and then using JSONPATHDOUBLE but the data that is coming to pinot is all null.

bZfoEV3U
Jackie-Jiang commented 23 hours ago

How do the values being converted from bytes? There are 2 solutions:

  1. Do not convert them into bytes from first place
  2. Understand the conversion algorithm, and use a UDF to convert it back during ingestion

cc @swaminathanmanish @KKcorps

rajat-sr1704 commented 11 hours ago

It is converted into Bytes using Kafka Connect's "connect.default": "\u0000", "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" . It is converted into Bytes for some other reason cannot stop that process as it will cause to re run all the clusters in kafka to store data.