gusutabopb / aioinflux

Asynchronous Python client for InfluxDB
MIT License
159 stars 31 forks source link

iterpoints only return the first group where processing GROUP-BY queries #29

Closed Karmenzind closed 4 years ago

Karmenzind commented 4 years ago

Hi

During processing this query:

SELECT ROUND(LAST(Free_Megabytes) / 1024) AS free, ROUND(Free_Megabytes / 1024 / (Percent_Free_Space / 100)) AS total, ROUND(Free_Megabytes / 1024 * ((100 - Percent_Free_Space) / Percent_Free_Space)) AS used, (100 - Percent_Free_Space) as percent, instance as path FROM win_disk WHERE host = 'ais-pc-16003' GROUP BY instance

This is the raw data that InfluxDBClient.query returned.

{'results': [{'series': [{'columns': ['time',
                                      'free',
                                      'total',
                                      'used',
                                      'percent',
                                      'path'],
                          'name': 'win_disk',
                          'tags': {'instance': 'C:'},
                          'values': [[1577419571000000000,
                                      94,
                                      238,
                                      144,
                                      60.49140930175781,
                                      'C:']]},
                         {'columns': ['time',
                                      'free',
                                      'total',
                                      'used',
                                      'percent',
                                      'path'],
                          'name': 'win_disk',
                          'tags': {'instance': 'D:'},
                          'values': [[1577419571000000000,
                                      1727,
                                      1863,
                                      136,
                                      7.3103790283203125,
                                      'D:']]},
                         {'columns': ['time',
                                      'free',
                                      'total',
                                      'used',
                                      'percent',
                                      'path'],
                          'name': 'win_disk',
                          'tags': {'instance': 'HarddiskVolume1'},
                          'values': [[1577419330000000000,
                                      0,
                                      0,
                                      0,
                                      29.292930603027344,
                                      'HarddiskVolume1']]},
                         {'columns': ['time',
                                      'free',
                                      'total',
                                      'used',
                                      'percent',
                                      'path'],
                          'name': 'win_disk',
                          'tags': {'instance': '_Total'},
                          'values': [[1577419571000000000,
                                      1821,
                                      2101,
                                      280,
                                      13.345237731933594,
                                      '_Total']]}],
              'statement_id': 0}]}

And I want to use this code to get parsed dicts:

def dict_parser(*x, meta):
    return dict(zip(meta['columns'], x))

g = fixed_iterpoints(r, dict_parser)

But only got the first row ("instance": "C:"). And below is the source of iterpoints. As you can see, the for-loop returned at the first iteration.

def iterpoints(resp: dict, parser: Optional[Callable] = None) -> Iterator[Any]:
    for statement in resp['results']:
        if 'series' not in statement:
            continue
        for series in statement['series']:
            if parser is None:
                return (x for x in series['values'])
            elif 'meta' in inspect.signature(parser).parameters:
                meta = {k: series[k] for k in series if k != 'values'}
                meta['statement_id'] = statement['statement_id']
                return (parser(*x, meta=meta) for x in series['values'])
            else:
                return (parser(*x) for x in series['values'])
    return iter([])

I modified this function as a workaround:

def fixed_iterpoints(resp: dict, parser: Optional[Callable] = None):
    for statement in resp['results']:
        if 'series' not in statement:
            continue

        gs = []
        for series in statement['series']:
            if parser is None:
                part = (x for x in series['values'])
            elif 'meta' in inspect.signature(parser).parameters:
                meta = {k: series[k] for k in series if k != 'values'}
                meta['statement_id'] = statement['statement_id']
                part = (parser(x, meta=meta) for x in series['values'])
            else:
                part = (parser(x) for x in series['values'])

            if len(statement['series']) == 1:
                return part

            gs.append(part)

        return gs
    return iter([])

It worked for me. But it returned nested generator which might be wierd. I want to know if you have a better idea.

gusutabopb commented 4 years ago

Thanks for raising the issue (and apologies for the super late reply).

This is a bug (thanks for catching!). It's probably coming from the fact that most of my queries are usually single statement queries and single return single series (and lack of testing).

Your fix is good for single-statement/multi-series queries but would fail for multi-statement queries (which I don't know why anyone would ever want to use, but InfluxDB allows it so we need code that can handle that).

Using itertools, I think something like this would be more generic (basically just pushing out gs up to the statement level):

def iterpoints(resp, parser=None):
    gs = []
    for statement in resp['results']:
        if 'series' not in statement:
            continue
        for series in statement['series']:
            if parser is None:
                gs.append((x for x in series['values']))
            elif 'meta' in inspect.signature(parser).parameters:
                meta = {k: series[k] for k in series if k != 'values'}
                meta['statement_id'] = statement['statement_id']
                gs.append((parser(*x, meta=meta) for x in series['values']))
            else:
                gs.append((parser(*x) for x in series['values']))
    return itertools.chain(*gs)

Let me know what you think.

I will add this fix to 0.10.0 (which will hopefully come out later this month).

Karmenzind commented 4 years ago

Thank you! I'll try it at my leisure time.