apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.45k stars 1.28k forks source link

Null data in derived columns after Table config update #8726

Open KKcorps opened 2 years ago

KKcorps commented 2 years ago

Steps to reproduce in 0.10.0 release -

  1. Add schema
    {
    "schemaName": "tSCalibrationRecord",
    "dimensionFieldSpecs": [
      {
        "name": "deviceId",
        "dataType": "STRING"
      },
      {
        "name": "nnTransId",
        "dataType": "STRING"
      },
      {
        "name": "status",
        "dataType": "STRING"
      },
      {
        "name": "header_js",
        "dataType": "JSON"
      },
      {
        "name": "refData_js",
        "dataType": "JSON"
      },
      {
        "name": "deviceActivity_js",
        "dataType": "JSON"
      },
      {
        "name": "geoData_js",
        "dataType": "JSON"
      },
      {
        "name": "nmtProcData_js",
        "dataType": "JSON"
      },
      {
        "name": "baroContextData_js",
        "dataType": "JSON"
      },
      {
        "name": "hvacData_js",
        "dataType": "JSON"
      },
      {
        "name": "gnssData_js",
        "dataType": "JSON"
      }
    ],
    "dateTimeFieldSpecs": [
      {
        "name": "fixTimestamp",
        "dataType": "LONG",
        "format": "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
    }
  2. Add table
{
  "REALTIME": {
    "tableName": "tSCalibrationRecord_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "tSCalibrationRecord",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "30",
      "replication": "1",
      "timeColumnName": "fixTimestamp",
      "allowNullTimeValue": false,
      "replicasPerPartition": "1",
      "completionConfig": {
        "completionMode": "DOWNLOAD"
      }
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant",
      "tagOverrideConfig": {}
    },
    "tableIndexConfig": {
      "invertedIndexColumns": [],
      "noDictionaryColumns": [],
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "calibrationrecord",
        "stream.kafka.broker.list": "localhost:9092",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "largest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.threshold.segment.size": "150M"
      },
      "rangeIndexColumns": [
        "fixTimestamp"
      ],
      "rangeIndexVersion": 2,
      "autoGeneratedInvertedIndex": true,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [
        "deviceId"
      ],
      "bloomFilterColumns": [],
      "loadMode": "MMAP",
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {},
    "quota": {},
    "routing": {
      "instanceSelectorType": "strictReplicaGroup"
    },
    "query": {},
    "ingestionConfig": {
      "transformConfigs": [
        {
          "columnName": "header_js",
          "transformFunction": "jsonFormat(header)"
        },
        {
          "columnName": "deviceActivity_js",
          "transformFunction": "jsonFormat(deviceActivity)"
        },
        {
          "columnName": "refData_js",
          "transformFunction": "jsonFormat(refData)"
        },
        {
          "columnName": "geoData_js",
          "transformFunction": "jsonFormat(geoData)"
        },
        {
          "columnName": "nmtProcData_js",
          "transformFunction": "jsonFormat(nmtProcData)"
        },
        {
          "columnName": "baroContextData_js",
          "transformFunction": "jsonFormat(baroContextData)"
        },
        {
          "columnName": "hvacData_js",
          "transformFunction": "jsonFormat(hvacData)"
        },
        {
          "columnName": "gnssData_js",
          "transformFunction": "jsonFormat(gnssData)"
        }
      ]
    },
    "isDimTable": false
  }
}
  1. Push some records in the following format

    {"header": {"nnTransId": "9003", "rid": 1, "timestamp": 1234567890123)}, "status": "N200", "deviceId": "test-3", "fixTimestamp": 1234567890123), "deviceActivity": {"acts": [{"aT": "KEEN", "cooor": 0.17487478}, {"aT": "KEEN", "cooor": 0.3853051}], "charging": True, "revisitId": 1821124849, "elapDay": 23, "revisitCount": 12}, "refData": {"pressure": 0.76513326, "pV": 0.22455376, "temperature": 0.32653052, "tV": 0.8554907, "distK": 0.8151323, "avDis": 0.6324501, "stat": 23, "timestamp": 1646730249900}, "geoData": {"metHa": 0.018798769, "flat99": 0.21407795, "ness99": 0.40781218, "builOv": [], "overDist": {"oPer": 0.49921197, "distribution": [0.9640298]}, "bCut": 0.29687208}, "nmtProcData": {"unTerrPA": 0.44992816, "terrPa": 0.35981137}, "baroContextData": {"davPA": 0.7646854, "bvPA": 0.23593152, "bvPer": 0.21551955}, "hvacData": {"cvPA": 0.6843114, "cbCon": 0.024642766}, "gnssData": {"dodVal": 0.64749956, "iodVal": 0.33511257}}
  2. Update schema in pinot to add the following column

     {
      "name": "header_nnTransId",
      "dataType": "STRING"
    }
  3. Update table in Pinot to add the following column

         {
            "columnName": "header_nnTransId",
            "transformFunction": "JSONPATHSTRING(header_js, '$.nnTransId')"
          }    
  1. Publish data again. The column header_nnTransId appears null when queried in Pinot when it should contain the correct value.

Optional: Verify after triggering RealtimeSegmentValidationManager job

saurabhd336 commented 2 years ago

@KKcorps Still trying to repro, but shouldn't the transformFunction be "JSONPATHSTRING(header_js, '$.header.nnTransId')"? As per the data being ingested?

KKcorps commented 2 years ago

Yeah, you are right. This was provided by the person who got the issue.

saurabhd336 commented 2 years ago

Confirming that issue is seen in master branch too. Looks like, _transformPipeline gets created once when LLRealtimeSegmentDataManager is created for the segment, and any changes to schema or tableConfig does not lead to updates to the pipeline. https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L1337

cc: @KKcorps

saurabhd336 commented 2 years ago

Yeah, you are right. This was provided by the person who got the issue.

Just for future reference, JSONPATHSTRING(header_js, '$.nnTransId') is indeed the correct expression, since it works on header_js which has already been parsed via header

      "columnName": "header_js",
      "transformFunction": "jsonFormat(header)"