xpublish-community / xpublish

Publish Xarray Datasets via a REST API.
https://xpublish.readthedocs.io
Apache License 2.0
168 stars 23 forks source link

Is possible to modify a dataset once the serve is started? #67

Open josephnowak opened 3 years ago

josephnowak commented 3 years ago

Hi, I'm interested in use xpublish to analyze some financial data that I have in a cluster. The thing is that the data is a time-series, so every day the data suffer a concatenation of data and when this happens the dataset that I made public does not show the new dates. I want to know how can I update that dataset without killing the serve and public again the datasets. I really can't find a method to do this in the documentation, sorry for annoying you with this but I think that this a great API for my use case.

josephnowak commented 3 years ago

I did a code that does exactly what I wanted, but It makes use of private attributes so I suppose It is not a good idea to use it and probably could produce some other problems, but It exemplifies what I want.

import numpy as np 
import xarray
import time
import xpublish
import zarr
import threading
import os
from fsspec.implementations.http import HTTPFileSystem

def compare_dataset(a, b):
    b = b.to_array()
    a = a.to_array()
    equals = True
    for name, coord in a.coords.items():
        equals &= coord.equals(b.coords[name])
    equals &= np.allclose(a.values, b.values, equal_nan=True)
    return equals

def generate_zarr(file_name, shape):
    a = np.random.rand(*shape)
    b = xarray.DataArray(
        a,
        dims=['index', 'columns'],
        coords={'index': list(range(shape[0])), 'columns': list(range(shape[1]))}
    ).to_dataset(name='test_arr').chunk({'index': 5})
    b.to_zarr(file_name, consolidated=True, mode='w')

generate_zarr('testing.zarr', (10, 5))
generate_zarr('testing2.zarr', (15, 7))
generate_zarr('testing3.zarr', (4, 7))

d = xarray.open_zarr('testing.zarr', consolidated=True)
d2 = xarray.open_zarr('testing2.zarr', consolidated=True)
d3 = xarray.open_zarr('testing3.zarr', consolidated=True)

# d.rest.serve(host='127.0.0.1', port=9000)
rest_collection = xpublish.Rest({'test': d})

# rest_collection.serve(host='127.0.0.1', port=9000)
p = threading.Thread(target=rest_collection.serve, kwargs={'host': '127.0.0.1', 'port': 9000})
p.start()

fs = HTTPFileSystem()

while True:
    key = int(input())
    if key == 1:    
        # Putting the original dataset
        d.attrs['_xpublish_id'] = 'test'
        rest_collection._datasets['test'] = d 

        # validating the results
        http_map = fs.get_mapper('http://127.0.0.1:9000/datasets/test')
        test_d = xarray.open_zarr(http_map, consolidated=True)
        print(compare_dataset(test_d, d))
    elif key == 2:
        d2.attrs['_xpublish_id'] = 'test'
        rest_collection._datasets['test'] = d2 

        # validating the results
        http_map = fs.get_mapper('http://127.0.0.1:9000/datasets/test')
        test_d = xarray.open_zarr(http_map, consolidated=True)
        print(compare_dataset(test_d, d2))
    elif key == 3:
        d3.attrs['_xpublish_id'] = 'test_evo'
        rest_collection._datasets['test_evo'] = d3 

        # validating the results
        http_map = fs.get_mapper('http://127.0.0.1:9000/datasets/test_evo')
        test_d = xarray.open_zarr(http_map, consolidated=True)
        print(compare_dataset(test_d, d3))
    elif key == 4:
        # Kill all
        os._exit(-1)

    # showing the datasets
    print(rest_collection._datasets)
    rest_collection.cache.clear()
jhamman commented 3 years ago

I haven't done anything like this yet. I wonder if either @lsetiawan or @benbovy has?

benbovy commented 3 years ago

@josephnowak you could update the dataset being served using a custom API endpoint, like in the example below where /add-random-var adds a new variable to the dataset and returns the variable name:

import string
import random

import numpy as np
import xarray as xr
import xpublish
from xpublish.dependencies import get_dataset
from xpublish.routers import base_router
from fastapi import APIRouter, Depends

update_router = APIRouter()

def create_random_var():
    vname = ''.join(random.choice(string.ascii_lowercase) for i in range(7))
    var = ('x', np.random.rand(100))
    return vname, var

@update_router.post("/add-random-var")
def add_random_var(dataset: xr.Dataset = Depends(get_dataset)) -> str:
    vname, var = create_random_var()
    dataset[vname] = var
    return vname

ds = xr.Dataset(dict([create_random_var()]))

ds.rest(routers=[base_router, update_router])

ds.rest.serve(host="127.0.0.1")

You could then have a service or a process that sends a request to this API endpoint every day.

I guess you could use some other features in FastAPI (authentication, origin, etc.) to restrict the access to this endpoint.

benbovy commented 3 years ago

The current limitation is that you can not replace the whole dataset being served by another one. All updates have to be in-place.

Also, it's not possible yet to use the application to update a collection of datasets being served, but I think it would be straightforward to support it by adding a get_dataset_collection FastAPI dependency to Xpublish.