nbren12 / geostreams

Streaming data from fortran climate/weather models.
6 stars 3 forks source link

xarray backend #6

Open jhamman opened 7 years ago

jhamman commented 7 years ago

TODO: discuss specs for xarray backend to redis streams

nbren12 commented 7 years ago

I put a page on the wiki proposing a key value structure for xarray.

jhamman commented 7 years ago

Thinking out loud here.

If this is the data stream we expect to pass via Redis:

"A": { "message":..., "dtype":..., "shape":...}
"A:dims" : ["x", "y", "z"]
"A:coords": {"xm": ("x", ), "ym": ("y", ), "zm": ("z", )} . # format: coord: (dims)
"A:attrs": {dict of attributes}
"xm" : { same format as A}
"ym" : { same format as A}
"zm" : { same format as A}

The psudo code for an xarray backend would look something like:


from collections import deque

stream = deque([], maxlen=1000)

key = 'A'

with open_redis(url, port, pw):
   # most metadata only needs to be streamed once
    dims = get_redis_dims(key='A')
    coord_dims = get_redis_coords(key='A')
    attrs = get_redis_attrs(key='A')
    for coord, cdims in coord_dims.items():
        coords[coord] = xr.Variable(cdims, get_redis_array(key=coord))

    # pull data from the redis stream for key='A'
    while True:
        da = xr.DataArray(get_redis_array(key='A'), dims=dims, coords=coords, attrs=attrs)
        stream.append(da)

The top level API may look something like:

while True:
    da = xr.read_stream(url, port, pw, 'A', maxlen=1000)

    # use da then throw it away
nbren12 commented 7 years ago

That seems like a pretty good idea. I am not sure if we should have the deque be in python memory though, since a redis list is essentially the same thing. In general, I think we should use redis data structures whenever possible.

I think we could augment the key value store to look like this

"A": ["A10", "A9", "A8", ..., "A1"] #  a redis list that is trimmed whenever data is pushed onto it
# also have a pubsub channel "A" which emits the keys
"A10": {"messages"...} # this is array data structure
"A9": {"messages", ...} # the previous time step.

Processes can listen to the pubsub channel or pop from the list to get a key, and then open that key using get_redis_array.

By popping data from redis, we can ensure that no two workers process the same data. If the user actually wants to ensure that each workers receives all the data, they can subscribe to the pubsub channel. The pseudo code for the backend could look like this

def get_stream(variable_key="A",
               redis_url=,
               redis_port=,
               redis_pw=,
               mode="pubsub"):

    if mode == "pubsub":
        p = redis.pubsub()
        p.subscribe(variable_key)
        yield from p.listen()
    elif mode == "queue":
        return (redis.rpop(variable_key) for itertools.count())

def xr_from_key(key):
    # get attrs,data,shape

def xr_stream(**kwargs):
    """Generator of xarrays objects"""
    return map(xr_from_key,  get_stream(**kwargs))
nbren12 commented 7 years ago

FYI it looks like @mrocklin has written a lightweight streaming analysis library for python called streamz. Maybe we should use that library.

There is a similar project called RxPy which seems to be designed for this sort of thing.

jhamman commented 7 years ago

So streamz looks really promising. I think we should open a "Use Case" issue over there and iterate with Matt on if/how it could be done.

mrocklin commented 7 years ago

That would be welcome. I may start becoming more active on the project near term for other reasons, so now would be a good time to exchange ideas.

On Sep 28, 2017 6:02 PM, "Joe Hamman" notifications@github.com wrote:

So streamz looks really promising. I think we should open a "Use Case" issue over there and iterate with Matt on if/how it could be done.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/nbren12/geostreams/issues/6#issuecomment-332976466, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszNqoxqlya5f95VvTmMeQn7bSHLlmks5snBeCgaJpZM4PZnzC .

nbren12 commented 7 years ago

It seems like streamz and rxpy implement very similar functionality, and both would cover pretty much any imaginable use case. Maybe Matt could comment on the differences between these libraries.

mrocklin commented 7 years ago

I would encourage playing with both and seeing what fits better. Rx is well established throughout a variety of languages, especially in UI communities. I originally built streamz as a personal education project (I wanted to know how reactive systems and systems with backpressure worked) but it has grown into something that fits my brain well and, I suspect, will fit the Python computational space better. It's also quite a bit simpler than Rx.

If Rx fits your application though then I would go for it. It's very well established.

nbren12 commented 7 years ago

Thanks for your thoughts Matt. I certainly understood the streamz documentation a little better, but Rx also seems pretty good.This project is at an exploratory stage atm, so I think I will try to give both a shot and see which one fits our needs more.

On Fri, Sep 29, 2017 at 7:31 AM, Matthew Rocklin notifications@github.com wrote:

I would encourage playing with both and seeing what fits better. Rx is well established throughout a variety of languages, especially in UI communities. I originally built streamz as a personal education project (I wanted to know how reactive systems and systems with backpressure worked) but it has grown into something that fits my brain well and, I suspect, will fit the Python computational space better. It's also quite a bit simpler than Rx.

If Rx fits your application though then I would go for it. It's very well established.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/nbren12/geostreams/issues/6#issuecomment-333142498, or mute the thread https://github.com/notifications/unsubscribe-auth/ABUokhIG3Mix5TPma3ek9uO3p5mR2ZpNks5snP83gaJpZM4PZnzC .