earthobservations / luftdatenpumpe

Acquire and process live and historical air quality data without efforts. Filter by station-id, sensor-id and sensor-type, apply reverse geocoding, store into time-series and RDBMS databases, publish to MQTT, output as JSON, or visualize in Grafana. Data sources: Sensor.Community (luftdaten.info), IRCELINE, and OpenAQ.
https://luftdatenpumpe.readthedocs.io/
GNU Affero General Public License v3.0
35 stars 3 forks source link

[LDI] Improve performance of historical data import #11

Open amotl opened 5 years ago

amotl commented 5 years ago

Coming from #9 and #8, we see the ingesting performance of historical data from LDI acquired from http://archive.luftdaten.info/ through wget could well be improved. We will outline some thoughts about this here (in no particular order):

d-roet commented 4 years ago

I stumbled across this InfluxDB blog mentioning enhancements for improved ingestion speeds by switching from JSON to the native line protocol format. A Python function to translate JSON's into this LP-format is included. See Writing Data to InfluxDB with Python

Perhaps this idea is useful for luftdatenpumpe too?

amotl commented 4 years ago

Thanks for mentioning that detail. However, it's a bit misleading: There is no JSON at all! Data is always transferred to InfluxDB using the line protocol. If data is passed in as a Python dictionary, it will get converted into line protocol through the make_lines routine.

I believe tuning the batch_size parameter will be more promising as data has to be converted to line protocol anyway. Using a more lightweight variant of make_lines might improve things performance-wise but also might miss some others.

Please recognize that luftdatenpumpe also provides the possibility to submit data using UDP [1] instead of HTTP over TCP. I am wondering why this detail was not mentioned at all within the blog post you have referenced.

Do you still have troubles with ingest performance, @d-roet? I would be happy to look into that if time permits.

[1] https://docs.influxdata.com/influxdb/v1.7/supported_protocols/udp/

d-roet commented 4 years ago

Thanks very much for mentioning that UDP possibility. I tried it following the docs you linked, but even the high-traffic UDP case mentioned there did not speed-up things as compared to the default HTTP ingestion. I experimented a bit with the batch-size and batch-pending options in the UDP configuration but that did not affect, nor improve, things much either.

As a work-around I have tried filtering our locally mirrored Luftdaten archive snapshot by only keeping the CSV archives of sensor id's that are relevant for our case. That, not surprisingly, speeds up things a lot (factor x20-x30 faster than before). I imagine this is because now luftdatenpumpe no longer needs to sift through all offered CSV's and filter by --country using geocoding as I did before.

So for my case this is a workable solution and probably does not warrant further investigation for optimizations inside luftdatenpumpe itself.

amotl commented 4 years ago

Stop ingesting LDI CSV files at all and use the Parquet files instead, see also Reading Parquet files.

I've just made a gist about how to acquire compressed Parquet files from archive.sensor.community and store their contents into InfluxDB. If this works out well, I will be happy to integrate this into Luftdatenpumpe appropriately.

The synopsis is easy to grok:

python ldi-parquet-to-influxdb.py <parquetfile> <database> <measurement>

However, please note the Parquet files are partitioned by time range, so there is no way to filter by country before actually reading them.

cc @d-roet, @wetterfrosch