apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.41k stars 3.68k forks source link

Window function fails to demarcate if 2 following are used #15739

Closed somu-imply closed 2 months ago

somu-imply commented 7 months ago

At this moment on Druid master, a query like

WITH virtual_table as (select ("customers_city(orders)") as vsum_dim1,sum("shipping_cost(orders)") as mes_col
from us_ecom_orders_com101 group by 1 order by mes_col DESC)
select vsum_dim1,mes_col,sum(mes_col) OVER (order by vsum_dim1 rows between 2 PRECEDING and 3 FOLLOWING) as mes_sum
from virtual_table order by 1

AND

WITH virtual_table as (select ("customers_city(orders)") as vsum_dim1,sum("shipping_cost(orders)") as mes_col
from us_ecom_orders_com101 group by 1 order by mes_col DESC)
select vsum_dim1,mes_col,sum(mes_col) OVER (order by vsum_dim1 rows between 2 FOLLOWING and 3 FOLLOWING) as mes_sum
from virtual_table order by 1

are planned identically with the native query showing the same offset in the processor for both

"processor": {
            "type": "framedAgg",
            "frame": {
              "peerType": "ROWS",
              "lowUnbounded": false,
              "lowOffset": 2,
              "uppUnbounded": false,
              "uppOffset": 3,
              "orderBy": null
            },

Going forward we need to demarcate these cases and make sure we support negative offsets in such a case. At the moment even in native you cannot specify

"processor": {
            "type": "framedAgg",
            "frame": {
              "peerType": "ROWS",
              "lowUnbounded": false,
              "lowOffset": -2,
              "uppUnbounded": false,
              "uppOffset": 3,
              "orderBy": null
            },
            "aggregations": [
              {
                "type": "longSum",
                "name": "w0",
                "fieldName": "a0"
              }
            ]
          }

which throws an IndexOutOfBounds exception.

Steps to reproduce

REPLACE INTO "us_ecom_orders_com101" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
      '{"type":"local","baseDir":"/Users/somu/Desktop/","filter":"us_ecom_orders_com101.csv"}',
      '{"type":"csv","findColumnsFromHeader":true}'
    )
  ) EXTEND ("orders_id(orders)" BIGINT, "customers_id(orders)" BIGINT, "customers_name(orders)" VARCHAR, "customers_street_address(orders)" VARCHAR, "customers_suburb(orders)" VARCHAR, "customers_city(orders)" VARCHAR, "customers_postcode(orders)" VARCHAR, "customers_state(orders)" VARCHAR, "customers_country(orders)" VARCHAR, "lat(orders)" DOUBLE, "lng(orders)" DOUBLE, "customers_telephone(orders)" BIGINT, "customers_email_address(orders)" VARCHAR, "customers_address_format_id(orders)" BIGINT, "delivery_name(orders)" VARCHAR, "delivery_street_address(orders)" VARCHAR, "delivery_suburb(orders)" VARCHAR, "delivery_city(orders)" VARCHAR, "delivery_postcode(orders)" BIGINT, "delivery_state(orders)" VARCHAR, "delivery_country(orders)" VARCHAR, "delivery_address_format_id(orders)" BIGINT, "payment_method(orders)" VARCHAR, "cc_type(orders)" VARCHAR, "cc_owner(orders)" VARCHAR, "cc_number(orders)" VARCHAR, "cc_expires(orders)" VARCHAR, "last_modified(orders)" VARCHAR, "date_purchased(orders)" VARCHAR, "shipping_cost(orders)" BIGINT, "shipping_method(orders)" VARCHAR, "orders_status(orders)" VARCHAR, "orders_date_finished(orders)" VARCHAR, "comments(orders)" VARCHAR, "currency(orders)" VARCHAR, "currency_value(orders)" BIGINT)
)
SELECT
  TIME_PARSE(TRIM("last_modified(orders)")) AS "__time",
  "orders_id(orders)",
  "customers_id(orders)",
  "customers_name(orders)",
  "customers_street_address(orders)",
  "customers_suburb(orders)",
  "customers_city(orders)",
  "customers_postcode(orders)",
  "customers_state(orders)",
  "customers_country(orders)",
  "lat(orders)",
  "lng(orders)",
  "customers_telephone(orders)",
  "customers_email_address(orders)",
  "customers_address_format_id(orders)",
  "delivery_name(orders)",
  "delivery_street_address(orders)",
  "delivery_suburb(orders)",
  "delivery_city(orders)",
  "delivery_postcode(orders)",
  "delivery_state(orders)",
  "delivery_country(orders)",
  "delivery_address_format_id(orders)",
  "payment_method(orders)",
  "cc_type(orders)",
  "cc_owner(orders)",
  "cc_number(orders)",
  "cc_expires(orders)",
  "date_purchased(orders)",
  "shipping_cost(orders)",
  "shipping_method(orders)",
  "orders_status(orders)",
  "orders_date_finished(orders)",
  "comments(orders)",
  "currency(orders)",
  "currency_value(orders)"
FROM "ext"
PARTITIONED BY DAY

The csv used in the above query is: us_ecom_orders_com101.csv

Another easy way to reproduce using the druid foo table is

WITH virtual_table as (select m2 ,sum(m1) as summ1
from foo group by 1 order by summ1 DESC)
select m2,summ1,sum(summ1) OVER (order by m2 rows between 2 FOLLOWING and 3 FOLLOWING) as sumfinal
from virtual_table order by 1
kgyrtkirk commented 7 months ago

some testcases which should start working if this is fixed:

type: "operatorValidation"

sql: |
  SELECT
    m1,
    FLOOR(m1/3),
    -- incorrect range frames https://github.com/apache/druid/issues/15739
    1 >= COUNT(1) OVER (ORDER BY m1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), 
    1 >= COUNT(1) OVER (ORDER BY m1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING),
    1 >= COUNT(1) OVER (ORDER BY m1 RANGE BETWEEN 1 PRECEDING AND 1 PRECEDING), 
    1 >= COUNT(1) OVER (ORDER BY m1 RANGE BETWEEN 1 FOLLOWING AND 1 FOLLOWING),
    -- not sure if this should be allowed or not; but if its allowed it should be 0
    0 = COUNT(1) OVER (ORDER BY FLOOR(m1/3) ROWS BETWEEN 2 FOLLOWING AND 1 FOLLOWING),
  FROM foo

expectedResults:
  - [1.0,0.0,true,true,true,true,true]
  - [2.0,0.0,true,true,true,true,true]
  - [3.0,1.0,true,true,true,true,true]
  - [4.0,1.0,true,true,true,true,true]
  - [5.0,1.0,true,true,true,true,true]
  - [6.0,2.0,true,true,true,true,true]