apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.26k stars 1.23k forks source link

Pinot 1.1 Multi-stage query OOMs and brings down all servers -- with all the default protections/params in place #13426

Open dorlevi opened 2 weeks ago

dorlevi commented 2 weeks ago

We have a realtime table (6 partitions, 140gb), when querying the table with timeout of 3 minutes all servers (6 servers, each 24cores and ~100 gb allocated to pinot) OOM and hangs

Query:

with dups as (
        select __global_counter,
                   __message_id,
                   __probe_id,
  min(__record_timestamp) as min_record_timestamp,
    max(__record_timestamp) as max_record_timestamp,
                  count(*) as cnt
        from org_2dYiMRMfas142XRKQ3bJIqmN3V6_ethereum_erc20_balance_changes_block_b91edb0804c04c2d9851eb975b689f0f
        group by __global_counter, __message_id, __probe_id
)

select 
*
from dups
order by cnt desc
limit 10

Explain plan:

Execution Plan
LogicalSort(sort0=[$5], dir0=[DESC], offset=[0], fetch=[10])
  PinotLogicalSortExchange(distribution=[hash], collation=[[5 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalSort(sort0=[$5], dir0=[DESC], fetch=[10])
      LogicalAggregate(group=[{0, 1, 2}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[COUNT($5)])
        PinotLogicalExchange(distribution=[hash[0, 1, 2]])
          LogicalAggregate(group=[{3, 4, 5}], agg#0=[MIN($6)], agg#1=[MAX($6)], agg#2=[COUNT()])
            LogicalTableScan(table=[[org_2dYiMRMfas142XRKQ3bJIqmN3V6_ethereum_erc20_balance_changes_block_b91edb0804c04c2d9851eb975b689f0f]])

We understand that such a query is perhaps not the best suited for Pinot but crashing all servers queried seems like a bug, especially as we haven't overridden any of the protections in place by the engine (besides timeout), we've reproduced it live for @mayankshriv and he suggested we open this issue.

OOM Logs from one of the servers (not super informative):

{"time":"2024-06-17T22:48:22.991521636+02:00","stream":"stdout","logtag":"F","message":"Terminating due to java.lang.OutOfMemoryError: Java heap space"}
--

Running server args (pinot 1.1):

\"args\" : [ \"--add-opens=java.base/java.nio=ALL-UNNAMED\", \"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\", \"--add-opens=java.base/java.lang=ALL-UNNAMED\", \"--add-opens=java.base/java.util=ALL-UNNAMED\", \"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED\", \"-Xms32G\", \"-Xmx32G\", \"-XX:+ExitOnOutOfMemoryError\", \"-javaagent:/opt/pinot/etc/jmx_prometheus_javaagent/jmx_prometheus_javaagent.jar=8008:/opt/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml\", \"-Dlog4j2.configurationFile=/opt/pinot/etc/conf/pinot-server-log4j2.xml\", \"-Dplugins.dir=/opt/pinot/plugins\", \"-Dplugins.dir=/opt/pinot/plugins\", \"-Dapp.name=pinot-admin\", \"-Dapp.pid=1\", \"-Dapp.repo=/opt/pinot/lib\", \"-Dapp.home=/opt/pinot\", \"-Dbasedir=/opt/pinot\" ],"

Table config

{
  "REALTIME": {
    "tableName": "org_2dYiMRMfas142XRKQ3bJIqmN3V6_ethereum_erc20_balance_changes_block_b91edb0804c04c2d9851eb975b689f0f_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "replication": "2",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "30",
      "replicasPerPartition": "1",
      "timeColumnName": "__record_timestamp",
      "minimizeDataMovement": false
    },
    "tenants": {
      "broker": "sim_community",
      "server": "sim_community"
    },
    "tableIndexConfig": {
      "columnMajorSegmentBuilderEnabled": false,
      "rangeIndexVersion": 2,
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [
        "__global_counter",
        "__message_id"
      ],
      "loadMode": "MMAP",
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false,
      "optimizeDictionary": true,
      "optimizeDictionaryForMetrics": false,
      "noDictionarySizeRatioThreshold": 0.85
    },
    "metadata": {},
    "quota": {
      "maxQueriesPerSecond": "20.0"
    },
    "query": {
      "timeoutMs": 5000
    },
    "fieldConfigList": [
      {
        "name": "__global_counter",
        "encodingType": "RAW",
        "indexTypes": [],
        "indexes": null,
        "tierOverwrites": null
      }
    ],
    "ingestionConfig": {
      "streamIngestionConfig": {
        "streamConfigMaps": [
          {
            "metadata.populate": "true",
            "realtime.segment.flush.autotune.initialRows": "1000000",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.segment.size": "500M",
            "realtime.segment.serverUploadToDeepStore": "true",
            "sasl.jaas.config": "*****";",
            "sasl.mechanism": ****",
            "security.protocol": "******",
            "stream.kafka.broker.list": "*****",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "stream.kafka.topic.name": "sim_f8ac4a56_17e7_4704_97a6_ba84f94b4d46_ethereum_erc20_balance_changes_block",
            "streamType": "kafka"
          }
        ],
        "columnMajorSegmentBuilderEnabled": false
      },
      "transformConfigs": [
        {
          "columnName": "__record_timestamp",
          "transformFunction": "__metadata$recordTimestamp"
        },
        {
          "columnName": "__global_counter",
          "transformFunction": "jsonPath(__internal, '$.global_counter')"
        },
        {
          "columnName": "__message_id",
          "transformFunction": "jsonPath(__internal, '$.message_id')"
        },
        {
          "columnName": "__probe_id",
          "transformFunction": "jsonPath(__internal, '$.probe_id')"
        },
        {
          "columnName": "txn_hash",
          "transformFunction": "concat('0x', jsonPath(__user, '$.txn_hash'))"
        },
        {
          "columnName": "block_number",
          "transformFunction": "jsonPath(__user, '$.block_number')"
        },
        {
          "columnName": "block_timestamp",
          "transformFunction": "jsonPath(__user, '$.block_timestamp')"
        },
        {
          "columnName": "token_address",
          "transformFunction": "concat('0x', jsonPath(__user, '$.token_address'))"
        },
        {
          "columnName": "token_name",
          "transformFunction": "jsonPath(__user, '$.token_name')"
        },
        {
          "columnName": "token_symbol",
          "transformFunction": "jsonPath(__user, '$.token_symbol')"
        },
        {
          "columnName": "token_decimals",
          "transformFunction": "bytesToBigDecimal(hexToBytes(concat('000000', jsonPath(__user, '$.token_decimals'))))"
        },
        {
          "columnName": "account_address",
          "transformFunction": "concat('0x', jsonPath(__user, '$.account_address'))"
        },
        {
          "columnName": "balance",
          "transformFunction": "bytesToBigDecimal(hexToBytes(concat('000000', jsonPath(__user, '$.balance'))))"
        }
      ],
      "continueOnError": false,
      "rowTimeValueCheck": true,
      "segmentTimeValueCheck": true
    },
    "isDimTable": false
  }
}
gortiz commented 2 weeks ago

Thanks for your report.

Apache Pinot has some OOM protection mechanisms that are applied to single-stage query engine but they are not applied in multi-stage query engine. I've created https://github.com/apache/pinot/issues/13436 in order to track them. We are actively working in some of them (specially in automatic query killing).

This specific query can be executed in single-stage. Have you try it there? Does it fail in that single-stage?

dorlevi commented 2 weeks ago

I need to find a proper time to test it because obviously killing our cluster is not something I can do at any time, (we don't have a test cluster with these big tables)

Regardless we care about the Multi-stage engine and we've seen it happen multiple times with different queries that some can only be executed on the multi-stage engine.

mayankshriv commented 1 week ago

cc: @Jackie-Jiang