tablelandnetwork / weeknotes

A place for weekly updates from the @tablelelandnetwork team
0 stars 0 forks source link

[NOT-158] Weeknotes individual update: December 11, 2023 #136

Closed dtbuchholz closed 10 months ago

dtbuchholz commented 10 months ago

Using DuckDB for remote queries on Basin data

by Dan Buchholz

In a previous writeup, we explored how to use Polars to query remote data was stored with Basin. We made a few changes to this design (source code here) to work with DuckDB, which is also performant and has native support for querying remote parquet file files.

A new Basin HTTP API was released and made it much easier to fetch remote data. After some data is pushed to a Basin "vault" (a signer-controlled data container), it is replicated to either a hot layer / cache or cold storage (Filecoin). The data I worked with in this example was network device data pushed by WeatherXM, which included datapoints like the device_id that captured weather datapoints temperature, humidity, etc. for a cell_id (at a lat/long coordinate). You can see what the shape of the data looks like here.

Here's an overview of the endpoints used in the script:

For example, to get each vault, the python code could do the following:

try:
    url = "https://basin.tableland.xyz/vaults"
    params = {"account": address}
    response = get(url, params=params)

    if response.status_code == 200:
        pubs = response.json()
        return pubs
    else:
        err(
            f"Failed to fetch data: {response.status_code}",
        )

except RequestException as e:
    error_msg = f"Error getting basin publications for address {address}"
    err(error_msg, e, type(e))

except JSONDecodeError as e:
    error_msg = f"JSON decoding error for address {address}"
    err(error_msg, e, type(e))

Loading into DuckDB

Note the Basin HTTP endpoints let you download the raw CAR files, but you still need to unpack the contents. The demo source code shows how to do this with a go-car command: car extract --file <path/to/basin/file> —for example, you could extract each CID's parquet file into a data directory. Once the underlying parquet file has been extracted, it can then be loaded into an in-memory DuckDB instance with the following:

from pathlib import Path

from duckdb import DuckDBPyConnection, connect

try:
    db = connect()
    data_dir = Path("data")
    files = Path(data_dir) / "*.parquet"  # Read all parquet files in data directory
    db.execute(f"CREATE VIEW xm_data AS SELECT * FROM read_parquet('{files}');")

    return db
except Exception as e:
    print(f"Error creating DuckDB connection: {e}")

Alternatively, DuckDB does support remote files. Instead of fetching data and downloading it locally, you could do fetch it with Web3storage (a single URL is shown as an example, but multiple could be passed):

db.execute(f"CREATE VIEW xm_data AS SELECT * FROM read_parquet('https://bafybeihmt5ny5u6bgaxlwnn7ylqnqjusijscz6rsuyprwmetfuf777jh7i.ipfs.w3s.link/xm_data/p1/1700843260462727.parquet');

Then, running queries on the data is as straightforward as SQL! For example, here's a query that aggregates precipitation data for a data range by cell ID

# Existing imports...
from pandas import DataFrame as PdDataFrame

start = None # Replace with a unix timestamp in ms
end = None # Replace with a unix timestamp in ms

# Define some bounding box to bound the data by cell ID position
bbox = (35, 72, -13, 60) # Approximately Europe's lat & longs

query = f"""
SELECT cell_id, SUM(precipitation_accumulated) as total_precipitation, AVG(lat) as lat, AVG(lon) as lon
FROM xm_data
WHERE lat BETWEEN {bbox[0]} AND {bbox[1]} AND lon BETWEEN {bbox[2]} AND {bbox[3]}
"""

# Add WHERE clause for time filtering, if applicable
where_clause = ""
if start is not None and end is not None:
    where_clause = f"AND timestamp >= {start} AND timestamp <= {end}"
elif start is not None:
    where_clause = f"AND timestamp >= {start}"
elif end is not None:
    where_clause = f"AND timestamp <= {end}"

# Combine all parts into one query and execute
query = query + f" {where_clause}" + " GROUP BY cell_id"
try:
    result = db.execute(query).df()  # Create a pandas PdDataframe
    return result
except Exception as e:
    print(f"Error executing DuckDB bbox query: {e}")

Plotting query data on maps

DuckDB has built-in methods like pl() or df() to turn the query into a Polars or Pandas dataframe, respectively, which can be useful if you're working with other data-oriented libraries. Then, with libraries like matplotlib and geopandas, you can take the dataframe from above and plot values, resulting in the following map (via CartoDB) of precipitation accumulated by cell in Europe.

image.png

There are plenty of other ways you could slice and dice WeatherXM's data—or any data—that has been made available with Basin!

From SyncLinear.com | NOT-158