druid-io / pydruid

A Python connector for Druid
Other
506 stars 194 forks source link

SubQueries Support #135

Closed pantlavanya closed 4 years ago

pantlavanya commented 5 years ago

Hello All,

How i can generate subqueries using pydruid because datasource field only take either str or list? """ ValueError: Datasource definition not valid. Must be string or list of strings """

Below is the sample query. On which I am passing query output of 1st query to another query as datasource.

{
  "queryType": "groupBy",
  "dataSource":{
    "type": "query",
    "query": {
      "queryType": "groupBy",
      "dataSource": "druid_source",
      "granularity": {"type": "period", "period": "P1M"},
      "dimensions": ["source_dim"],
      "aggregations": [
        { "type": "doubleMax", "name": "value", "fieldName": "stream_value" }
      ],
      "intervals": [ "2012-01-01T00:00:00.000/2020-01-03T00:00:00.000" ]
    }
  },
  "granularity": "hour",
  "dimensions": ["source_dim"],
  "aggregations": [
    { "type": "longSum", "name": "outerquerryvalue", "fieldName": "value" }
  ],
  "intervals": [ "2012-01-01T00:00:00.000/2020-01-03T00:00:00.000" ]
}
mistercrunch commented 5 years ago

Not currently supported AFAICT

pantlavanya commented 5 years ago

Hi @mistercrunch, Can I send a PR for this? I have find a way to do this.

mistercrunch commented 5 years ago

Please do!

pantlavanya commented 5 years ago

Hi @mistercrunch Below is my PR. It may not be perfect, let me know you suggestions.

https://github.com/druid-io/pydruid/pull/139

pantlavanya commented 5 years ago

This issue is fixed. Below is the way you can define sub queries.

group = query.groupby(
    datasource=query.sub_query(datasource='twitterstream',
            granularity='hour',
            intervals='2018-01-01/2018-05-31',
            dimensions=["dim_key", "dim_key2"],
            filter=(Dimension('user_lang') == 'en') & (Dimension('user_name') == 'ram'),
            aggregations={"first_value": doublefirst("stream_value"),"last_value": doublelast("stream_value")},
            post_aggregations={'final_value': (HyperUniqueCardinality('last_value') - HyperUniqueCardinality('first_value'))}
    ),
    granularity='day',
    intervals='2018-01-01/2018-05-31',
    dimensions=["dim_key"],
    aggregations={"outer_final_value": doublesum("final_value")}
)
ashexpert commented 4 years ago

it still raise ValueError because query.sub_query return dict while in query.groupby check for the given datasource to be string or list of string. can you fix that too?

pantlavanya commented 4 years ago

Sure I can fix this, What your trying to do. Can you please give me an example?

pantlavanya commented 4 years ago

Hi @ashexpert,

I Understand your problem. Yes when you use sub_query, it return dict. I will fix it.

I think your talking about this, If i am not wrong.

def parse_datasource(datasource, query_type):
    """
    Parse an input datasource object into valid dictionary

    Input can be a string, in which case it is simply returned, or a
    list, when it is turned into a UNION datasource.

    :param datasource: datasource parameter
    :param string query_type: query type
    :raise ValueError: if input is not string or list of strings or dict
    """
    if not (
        isinstance(datasource, six.string_types)
        or (
            isinstance(datasource, list)
            and all([isinstance(x, six.string_types) for x in datasource])
        )  or
        isinstance(datasource, dict)
    ):
        raise ValueError(
            "Datasource definition not valid. Must be string or dict or list of strings"
        )
    if isinstance(datasource, six.string_types):
        return datasource
    else:
        return {"type": "union", "dataSources": datasource}
pantlavanya commented 4 years ago

PR https://github.com/druid-io/pydruid/pull/179

ashexpert commented 4 years ago

Hi @pantlavanya yes that was exactly my problem sorry i didn't answer but you figured it out by yourself

pantlavanya commented 4 years ago

Hi @mistercrunch Can we get this merge. Thanks @ashexpert for pointing this.

https://github.com/druid-io/pydruid/pull/179

veerappans commented 1 year ago

This issue is fixed. Below is the way you can define sub queries.

group = query.groupby(
    datasource=query.sub_query(datasource='twitterstream',
            granularity='hour',
            intervals='2018-01-01/2018-05-31',
            dimensions=["dim_key", "dim_key2"],
            filter=(Dimension('user_lang') == 'en') & (Dimension('user_name') == 'ram'),
            aggregations={"first_value": doublefirst("stream_value"),"last_value": doublelast("stream_value")},
            post_aggregations={'final_value': (HyperUniqueCardinality('last_value') - HyperUniqueCardinality('first_value'))}
    ),
    granularity='day',
    intervals='2018-01-01/2018-05-31',
    dimensions=["dim_key"],
    aggregations={"outer_final_value": doublesum("final_value")}
)

This format does not work for me. Getting this error: {\"error\":\"Unknown exception\",\"errorMessage\":\"Cannot deserialize instance of java.util.ArrayList<org.apache.druid.query.TableDataSource> out of START_OBJECT token\\n at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 73]

Can you please help.

veerappans commented 1 year ago

Its not forming the query right. Its adding extra union type which is making the syntax incorrect.

veerappans commented 1 year ago

@pantlavanya , can you please help. When I use the pydruid query format that you have given above, it is getting converted to native druid with an extra 'union' and druid query is failing. Would appreciate if you can help! Thanks.