druid-io / pydruid

A Python connector for Druid
Other
505 stars 194 forks source link

Support for multi-stage-query (sql-msq-task) #302

Open emanueledomingo opened 1 year ago

emanueledomingo commented 1 year ago

Hi everyone.

With Druid 24.0.0 the team included the possibility to start an ingestion using SQL. This uses a new SQL engine called multi-stage-query. Changelog here.

This seems to not be supported in pydruid (using SqlAlchemy). If i run

import sqlalchemy as sqla

query = """
INSERT INTO w000
SELECT
    TIME_PARSE("timestamp") AS __time,
    isRobot,
    channel,
    flags,
    isUnpatrolled,
    page,
    diffUrl,
    added,
    comment,
    commentLength,
    isNew,
    isMinor,
    delta,
    isAnonymous,
    user,
    deltaBucket,
    deleted,
    namespace,
    cityName,
    countryName,
    regionIsoCode,
    metroCode,
    countryIsoCode,
    regionName
FROM TABLE(
    EXTERN(
        '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
        '{"type":"json"}',
        '[{"name":"isRobot","type":"string"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"page","type":"string"},{"name":"diffUrl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentLength","type":"long"},{"name":"isNew","type":"string"},{"name":"isMinor","type":"string"},{"name":"delta","type":"long"},{"name":"isAnonymous","type":"string"},{"name":"user","type":"string"},{"name":"deltaBucket","type":"long"},{"name":"deleted","type":"long"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]'
    )
)
PARTITIONED BY HOUR
CLUSTERED BY channel
"""

engine = sqla.create_engine("druid://localhost:8888/druid/v2/sql/")
engine.execute(query)

Query taken from this example

i get

ProgrammingError: (pydruid.db.exceptions.ProgrammingError) Plan validation failed (org.apache.calcite.tools.ValidationException): Cannot execute INSERT with SQL engine 'native'.

Using:

Is there a way to specify a different engine? it will be great to use sql-msq-task instead of native.

adamb-sternumiot commented 1 year ago

I'm not entirely sure about it but checked how the UI console does the engine switch and basically the MSQ is a separate endpoint. Normal SQL queries are sent to /druid/v2/sql while MSQ queries go to /druid/v2/sql/task. Unfortunately just changing the URL didn't work for me; possibly the MSQ endpoint isn't quite compatible.