gusutabopb / aioinflux

Asynchronous Python client for InfluxDB
MIT License
159 stars 31 forks source link

issue for query #18

Closed peiyaoli closed 5 years ago

peiyaoli commented 5 years ago

Hi, Gustavo,

I tried to follow the demo on our project which doesnt work, could you help me to figure out the reason?

here is my code

async def read_influxdb(userid, starttime, endtime):
    #logger = logging.getLogger("influxDB read demo")
    async with InfluxDBClient(host=localhost, port=8086, username='admin', password='123456',db=db_name) as client:
        user_id = '\'' + str(userid) + '\''
        sql_ecg = 'SELECT point FROM wave WHERE (person_zid = {}) AND (time > {}s) AND (time < {}s)'.format(user_id, starttime, endtime)
        await client.query(sql_ecg, chunked=True)

if __name__ ==  '__main__':
    user_id = 973097
    starttime = '2018-09-26 18:08:48'
    endtime = '2018-09-27 18:08:48'
    starttime_posix = utc_to_local(starttime)
    endtime_posix = utc_to_local(endtime)
    asyncio.get_event_loop().run_until_complete(read_influxdb(user_id, starttime_posix, endtime_posix))

We I run this code, I get the errors below:

sys:1: RuntimeWarning: coroutine 'query' was never awaited
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f78f630>

Best

gusutabopb commented 5 years ago

From your sample code, it doesn't seem like you are missing any awaits (which usually caused that RuntimeWarning), so I can't really tell what is the exact cause for that.

However, have you tried not using the chunked option? Using chunked=True makes query return an async generator, instead of the result directly and you are not iterating that generator, hence not getting any results.

A likely fix would be to simply NOT use chunked queries:

await client.query(sql_ecg)

Chunked queries are a somewhat obscure functionality in InfluxDB (which is also not very well documented) which I think most users are better off avoiding (the performance gains, if any, seem to be negligible from personal experience). I wouldn't be surprised if it were removed completely from InfluxDB in the upcoming v2.0.

In case you really need to use it (which you likely don't), the correct syntax would be:

async for chunk in await client.query(sql_ecg, chunked=True):
    print(chunk)  # or do something else
peiyaoli commented 5 years ago

Hi, @gusutabopb , thanks for your detailed explanation here. In our project, we need to query out 24 hours data which is roughly over 10 millions points. Using original python-influxdb library is time consuming which takes about 1 min! One way to solve this is to cut one 24 hours query into 24 1-hour queries. Through multiprocessing, it might help.

Now I am trying to use asyncio, to my understanding, what I have to do is to create a coroutine list consisted of 24 queries and execute it as your demo code shows. Am I correct? Like:

coros = []
for s, e in zip(starttime, endtime):
    sql_ecg = 'SELECT point FROM wave WHERE (person_zid = {}) AND (time > {}s) AND (time < {}s)'.format(user_id, s, e)
    coros.append(client.query(sql_ecg)

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*coros))

The other question is about chunked setting. Does it help? I am confused by chunked setting. Would it speed up the query?

gusutabopb commented 5 years ago

Yes, splitting your query into small time windows is usually the best approach (in other words, avoid the using chunked queries). Your asyncio code looks roughly correct.

This is getting a bit off topic, so I am closing this as it is not really an issue/bug with aioinflux. Questions about aioinflux usage are more than welcome, however if you have general questions about asyncio or InfluxDB itself please refer to the Stackoverflow or the official InfluxDB/Python (asyncio) docs. The asyncio docs have been rewritten recently and look a lot more accessible than before.