apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.46k stars 3.7k forks source link

Issue with result from CONCAT expression when using Kafka streaming ingestion. #9460

Closed harnasz closed 1 year ago

harnasz commented 4 years ago

We are seeing issues when using Kafka Streaming Ingestion and then querying and invoking the CONCAT expression believing this could be due to the rows being in the aggregate heap memory and not yet being persisted to the segments.

We have witnessed the following behavior of the expression in the following circumstances which we believe are when the rows are still in the heap and yet to be persisted:

When using batch ingestion we do not witness the behaviors or when we set "maxRowsPerSegment" to 1 in the Kafka Tuning Config.

See below for more detail.

Affected Version

0.16and 0.17

Description

Cluster size

Single using the quickstart bin/start-single-server-small

Configurations in use

Using the default configuration located here:

conf/druid/single-server/small

However, using MySQL for the metadata storage and enabling globally cached lookups.

Steps to reproduce the problem

Setup a Kafka Supervisor

Note "maxRowsPerSegment":3 in tuningConfig, we will refer to this later

curl -X POST \
  http://localhost:8888/druid/indexer/v1/supervisor \
  -H 'Content-Type: application/json' \
  -d '{
  "type":"kafka",
  "dataSchema":{
    "dataSource":"items",
    "parser":{
      "type":"string",
      "parseSpec":{
        "format":"csv",
        "timestampSpec":{
          "column":"time",
          "format":"iso"
        },
        "columns":[
          "time",
          "currency",
          "value"
        ],
        "dimensionsSpec":{
          "dimensions":[
            "currency"
          ]
        }
      }
    },
    "metricsSpec":[
      {
        "name":"count",
        "type":"count"
      },
      {
        "name":"sum_value",
        "type":"doubleSum",
        "fieldName":"value"
      }
    ],
    "granularitySpec":{
      "type":"uniform",
      "segmentGranularity":"WEEK",
      "queryGranularity":"NONE",
      "rollup": true
    }
  },
  "tuningConfig":{
    "type":"kafka",
    "maxRowsPerSegment":3,
    "reportParseExceptions": true,
    "logParseExceptions": true
  },
  "ioConfig":{
    "topic":"debugitems",
    "consumerProperties":{
      "bootstrap.servers":"localhost:9092"
    },
    "taskCount":1,
    "replicas":1,
    "taskDuration":"PT1H"
  }
}'

Setup Lookups

Place the contents of below

2020-01-14:EUR,0.7
2020-01-14:GBP,0.9
2020-01-15:EUR,0.7
2020-01-15:GBP,0.9
2020-01-16:EUR,1.7
2020-01-16:GBP,1.9
2020-01-17:EUR,2.7
2020-01-17:GBP,2.9
2020-01-18:EUR,3.7
2020-01-18:GBP,3.9
2020-01-19:EUR,2.7
2020-01-19:GBP,4.9

into /tmp/currencies.csv

Then create the lookup:

curl -X POST \
  http://localhost:8888/druid/coordinator/v1/lookups/config \
  -H 'Content-Type: application/json' \
  -d '{
  "__default": {
    "currency_conversion": {
      "version": "2019-10-07T09:58:02.656Z",
      "lookupExtractorFactory": {
        "type": "cachedNamespace",
        "extractionNamespace": {
          "type": "uri",
          "uri": "file:/tmp/currencies.csv",
          "namespaceParseSpec": {
            "format": "csv",
            "columns": [
              "key",
              "value"
            ]
          },
          "pollPeriod": "PT5S"
        },
        "firstCacheTimeout": 0
      }
    }
  }
}
'

Running Through the Problem

Using Kafkacat execute the following two commands to produce the messages:

echo "2020-01-14T11:11:00.000Z,GBP,30.12" | kafkacat -b  127.0.0.1:9092  -t debugitems
echo "2020-01-15T11:11:00.000Z,EUR,30.12" | kafkacat -b  127.0.0.1:9092  -t debugitems

Then run the following query:

SELECT __time,  sum_value, CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', currency) as "concat_expression" FROM items

and you will see the results of below:

(Query 1 Result)
+--------------------------+-----------+--------------------+
|          __time          | sum_value | concat_expression  |
+--------------------------+-----------+--------------------+
| 2020-01-14T11:11:00.000Z |     30.12 | ["2020-01-14:GBP"] |
| 2020-01-15T11:11:00.000Z |     30.12 | ["2020-01-15:EUR"] |
+--------------------------+-----------+--------------------+

If you then run the following query which uses an aggregate function of SUM

SELECT 
  __time, 
  currency, 
  SUM(sum_value) "sum value", 
  CONCAT(
    TIME_FORMAT(__time, 'yyyy-MM-dd'), 
    ':', 
    currency
  ) as "concat_expression" 
FROM 
  items 
GROUP BY 
  1, 
  2

You will see the results below:

(Query 2 Result)
+--------------------------+----------+-----------+-------------------+
|          __time          | currency | sum_value | concat_expression |
+--------------------------+----------+-----------+-------------------+
| 2020-01-14T11:11:00.000Z | GBP      |     30.12 | 2020-01-14:GBP    |
| 2020-01-15T11:11:00.000Z | EUR      |     30.12 | 2020-01-15:EUR    |
+--------------------------+----------+-----------+-------------------+

From the results of Query 1 the values in the column of concat_expression wrap the value with [" and "]. Using the LTRIM and RTRIM functions to trim [" does not have any impact on the value from the expression.

From the results of Query 2 are not wrapped with [" and "].

As part of our query, we want to use the concat_expression result in a lookup. If we run the following query

select
__time,
currency,
LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', 'EUR'), 'currency_conversion') as "Fixed EUR Lookup",
LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', 'GBP'), 'currency_conversion') as "Fixed GBP Lookup",
LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', currency), 'currency_conversion') as "Row Lookup",
SUM(sum_value) as "Total Value",
SUM((sum_value / CAST(LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', currency), 'currency_conversion') AS FLOAT)) * CAST(LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', 'EUR'), 'currency_conversion') AS FLOAT)) as "Converted Total Value"
from items 
GROUP BY 1,2

We get the following result:

(Query 3 Result)
+--------------------------+----------+------------------+------------------+------------+-------------+-----------------------+
|          __time          | currency | Fixed EUR Lookup | Fixed GBP Lookup | Row Lookup | Total Value | Converted Total Value |
+--------------------------+----------+------------------+------------------+------------+-------------+-----------------------+
| 2020-01-14T11:11:00.000Z | GBP      |              0.7 |              0.9 |        0.9 |       30.12 |                     0 |
| 2020-01-15T11:11:00.000Z | EUR      |              0.7 |              0.9 |        0.7 |       30.12 |                     0 |
+--------------------------+----------+------------------+------------------+------------+-------------+-----------------------+

With the Converted Total Value incorrectly returning 0 however the values of the lookups are being returned for the given currency when not being used in tandem with the SUM expression.


Moving on.

If you then produce another message of:

echo "2020-01-16T11:11:00.000Z,GBP,30.12" | kafkacat -b  127.0.0.1:9092  -t debugitems

And then rerun the below query

SELECT __time,  sum_value, CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', currency) as "concat_expression" FROM items

You will see:

(Query 4 Result)
+--------------------------+-----------+-------------------+
|          __time          | sum_value | concat_expression |
+--------------------------+-----------+-------------------+
| 2020-01-14T11:11:00.000Z |     30.12 | 2020-01-14:GBP    |
| 2020-01-15T11:11:00.000Z |     30.12 | 2020-01-15:EUR    |
| 2020-01-16T11:11:00.000Z |     30.12 | 2020-01-16:GBP    |
+--------------------------+-----------+-------------------+

From the results of Query 3 after 3 values have been ingested the values in the column of concat_expression are no longer wrapped with [" and "].

If we then re-run the query below

select
__time,
currency,
LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', 'EUR'), 'currency_conversion') as "Fixed EUR Lookup",
LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', 'GBP'), 'currency_conversion') as "Fixed GBP Lookup",
LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', currency), 'currency_conversion') as "Row Lookup",
SUM(sum_value) as "Total Value",
SUM((sum_value / CAST(LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', currency), 'currency_conversion') AS FLOAT)) * CAST(LOOKUP(CONCAT(TIME_FORMAT(__time, 'yyyy-MM-dd'), ':', 'EUR'), 'currency_conversion') AS FLOAT)) as "Converted Total Value"
from items 
GROUP BY 1,2

We get the following

(Query 5 Result)
+--------------------------+----------+------------------+------------------+------------+-------------+-----------------------+
|          __time          | currency | Fixed EUR Lookup | Fixed GBP Lookup | Row Lookup | Total Value | Converted Total Value |
+--------------------------+----------+------------------+------------------+------------+-------------+-----------------------+
| 2020-01-14T11:11:00.000Z | GBP      |              0.7 |              0.9 |        0.9 |       30.12 |    23.426666666666666 |
| 2020-01-15T11:11:00.000Z | EUR      |              0.7 |              0.9 |        0.7 |       30.12 |                 30.12 |
| 2020-01-16T11:11:00.000Z | GBP      |              1.7 |              1.9 |        1.9 |       30.12 |    26.949473684210528 |
+--------------------------+----------+------------------+------------------+------------+-------------+-----------------------+

With Converted Total Value returning the expected results.

We believe this could be due to that when two rows have been ingested they are aggregated in heap memory however when the third row gets ingested it gets persisted to the segment due to setting "maxRowsPerSegment" to 3.

Any debugging that you have already done

The only debugging that has been carried out is changing the tuning config values, reducing intermediatePersistPeriod or maxRowsPerSegment to persist the rows to the segments.

Synforge commented 4 years ago

I've done a little bit of digging on this and this bug applies to all string dimension columns in the IncrementalIndexStorageAdapter. It seems that regardless of whether a multi value was inserted into a column or not, this storage adapter sets all string columns to be multi value.

e.g. for the example above while it hasn't been persisted a query for segment metadata results in this:

"currency": { "cardinality": 2, "errorMessage": null, "hasMultipleValues": true, "maxValue": "GBP", "minValue": "EUR", "size": 0, "type": "STRING" }

Whereas the persisted data returns hasMultipleValues correctly as false, it seems this results in inconsistencies when using any kind of string function against a dimensional column that has not yet been persisted vs data that has been persisted. So I think this problem is bigger than just the above report.

I verified this by amending the following to return false and this then correctly returns just a string value instead of an array. However I'm aware this may break multi-values on ingestion?

https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java#L166

Happy to take a look further if anyone can offer any advice as to how to tackle this problem. I believe @gianm wrote some of this code, I'm hoping you might be able to offer some advice?

Thanks

sdemontfort commented 3 years ago

I noticed this same issue recently. @Synforge were you able to contribute to a solution for this?

At the moment we've just used the following work around (in native query), to ensure any "multi value" dimensions are flattened on response from rows in heap:

array_to_string(non_mv_dimension, '')
github-actions[bot] commented 1 year ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

github-actions[bot] commented 1 year ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.