python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Proposal: Publishing and Retrieving SDFs for SDF Checkpointing #353

Open chinmaychandak opened 4 years ago

chinmaychandak commented 4 years ago

[Copy-pasting part of the description of #340]

To checkpoint the state of an SDF, we need to do 3 things:

  1. When a job starts, retrieve (either from Dask datasets/S3/disk) the last known SDF state (if any) from previous runs. This can just be a one-time function call to a UDF outside of streamz.
  2. Pass this retrieved state into an SDF so that the new run of this streamz job can pick up from where it left off — basically, allow SDFs to accept an initial state.
  3. Publish (either to Dask/S3/disk) the updated SDF state at the end of every batch.

The code for step 2, i.e., SDFs to accept an initial state, was merged by the #340 above. This issue is to invite ideas for how steps 1 and 3 (mainly 3) above should be implemented.


I was able to "manually" publish SDFs at the end of every batch using Dask's published datasets. Here's a simple example of a streamz job which retrieves the SDF checkpoint stored as a Dask dataset, uses it to initialize an SDF operation, and then updates this dataset after every batch. In case the job crashes, the stream restarts using the last known SDF checkpoint dataset published to Dask.

from distributed import Client, LocalCluster
from streamz import Stream
from streamz.dataframe import DataFrame
import pandas as pd

# Dask Client
client = Client("localhost:8786")
client.get_versions(check=True)
client

def get_SDF_checkpoint(key):
    client_x = Client("localhost:8786")
    datasets = client_x.list_datasets()
    res = None
    if key in datasets:
        res = client_x.get_dataset(key)
        client_x.close()
    return res

# Retrieve last known SDF checkpoint, if any
start_state = get_SDF_checkpoint("sdf_checkpoint")

# UDF to publish SDF state onto Dask (we would need a generic interface for other sinks like S3)
def publish_SDF_checkpoint(res, key):
    client_x = Client("localhost:8786")
    datasets = client_x.list_datasets() 
    # For some groupby aggs
    if type(res) is not tuple:
        res = (res, res)    
    if res[0] is None:
        return res[1]
    if key in datasets:
        client_x.unpublish_dataset(key)
    client_x.publish_dataset(res[0], name=key)
    client_x.close()
    return res[1]

# Kafka consumer Configs
topic = "custreamz-test"
bootstrap_servers = 'localhost:9092'
consumer_conf = {
    "bootstrap.servers": bootstrap_servers,
    "enable.partition.eof": "true",
    "group.id": "custreamz-test",
    "auto.offset.reset":"latest"
}

# Start a stream from Kafka
source = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='20s', 
                                   npartitions=4, max_batch_size=10,
                                   asynchronous=True, dask=True) 

def preprocess(messages):
    json_input_string = "\n".join([msg.decode('utf-8') for msg in messages])
    df = pd.read_json(json_input_string, lines=True)   
    return df

# Preprocess data
stream = source.map(preprocess)

# Create an SDF
example = pd.DataFrame({'Name':[], 'Amount':[]})
sdf = DataFrame(stream, example=example)

# Window Function for SDFs
def window_func(window_gdf):

    # Handle state downstream
    state = None
    if isinstance(window_gdf, tuple):
        state = window_gdf[0]
        window_gdf = window_gdf[1]

    aggs = window_gdf.groupby(["Name"]).agg({"Amount":"sum"}).reset_index()
    return state, aggs

# Apply window function on SDF. Uses the retrieved state to initialize the stream.
postproc_stream = sdf.window(5, with_state=True, start=start_state).apply(window_func).stream

# Publish updated SDF at the end of every batch and gather output
output = postproc_stream.map(publish_SDF_checkpoint, "sdf_checkpoint").gather().sink_to_list()

# Start the stream
source.start()

Simple Producer script:

import confluent_kafka as ck
producer_conf = {'bootstrap.servers': bootstrap_servers, 'compression.type':'snappy'}
producer = ck.Producer(producer_conf)

producer.produce(topic, '{"Name":"Alice", "Amount":100}')
producer.produce(topic, '{"Name":"Tom", "Amount":200}')
producer.produce(topic, '{"Name":"Linda", "Amount":300}')
producer.produce(topic, '{"Name":"Bob", "Amount":50}')
producer.produce(topic, '{"Name":"Alice", "Amount":400}')

producer.flush()

Every time a batch is processed, we can see the final result of the streamz job using output[-1] and the last saved SDF checkpoint Dask dataset using get_SDF_checkpoint("sdf_checkpoint").

Everything works perfectly functionality-wise, other than the fact that in the above script, the SDF checkpoint and the Kafka checkpoint are out-of-sync. The SDF publishing happens before the Kafka checkpointing, which is not how it should ideally be. The easiest way to tie them up would be to add the SDF state to the metadata passed down the stream so that when the Kafka checkpointing happens, the callback would also trigger publishing the SDF onto Dask/S3.

I would like to hear about whether this would be the correct way to implement this feature? If yes, any suggestions on how this should be done cleanly and efficiently?

jsmaupin commented 4 years ago

Do we add a callback to the sources to allow users to update the metadata?

jsmaupin commented 4 years ago

One thing we were discussing is the idea that having the user call this publish function might be confusing and they might not use it when they should.

So, I was thinking, when we have with_state=True on the SDF, then we can implement checkpointing just like from_kafka_batched does it, but inside of the SDF code. It assigns a callback to the reference counter which will get triggered when the reference counter goes to 0. (https://github.com/python-streamz/streamz/blob/master/streamz/sources.py#L485). As you can see the state gets preserved when the callback function is created.

The only problem is that the RefCounter class takes in a single callback in the constructor (https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L78). This would need to be expanded so that it can handle a list of callbacks.

chinmaychandak commented 4 years ago

This definitely sounds like a good approach. Would the SDF code add the state and the other information like Dask dataset key, etc., to the metadata, so that it would be passed downstream along with the refcounters?

jsmaupin commented 4 years ago

I'm not sure we even have to use metadata in this case. The sdf_state can be referenced in the Tornado function callback.

satishvarmadandu commented 4 years ago

yes, like the idea of "publishing state" integrate with "with_state=True". Also publishing state is bit expensive esp if we are publish state to a path outside DASK. Why do we need a list of callbacks ?

jsmaupin commented 4 years ago

I'm just wondering if @CJ-Wright or @martindurant have any input here.

martindurant commented 4 years ago

Sorry to be late to the conversation. This is obviously not too simple. I think it would be good to keep to the current setup as far as possible:

I think the first two can be enough: you "publish" the state with its metadata (e.g., offset) in a normal node, like any map operation. Upon restart, you either just want the most recent state, or the state that accompanies the most recently completed checkpoint. Which you want depends on whether you want guaranteed processing or not - this should work without checkpointing too, if it's OK to loose some events in the aggregation.

chinmaychandak commented 4 years ago

you "publish" the state with its metadata (e.g., offset) in a normal node, like any map operation.

Are you suggesting that we have an explicit map node in the pipeline, or introduce a publish node, to add the state to the metadata? We were hoping that the addition of the SDF state to the metadata should be internally handled (when we have with_state=True, which can probably be renamed), otherwise the end-user would need to be aware of adding the publish logic in all the pipelines using SDFs in order to not lose state in case of stream failure.

Moreover, IMO (and this is just from personal experience), when one uses with_state, it should be for checkpointing purposes, since the state can be as complex as a deque of dataframes, etc. If we do handle this internally, like I said above, we might not even need to pass the state downstream, and it all becomes cleaner.

as they act outside of the processing stream/graph and so are harder to reason about; these callbacks now feel like downstream nodes.

I agree, but I think a couple of callbacks like these are "source-specific", and should be fine, if there's not a cleaner way. I'm sure Spark Streaming and other streaming stacks have separate checkpointing mechanisms for different sources.

A good thing about callbacks here, though, is that we can ask the user for the callback function (users might want to publish to/recover from an SDF state either from Dask/S3/Disk, etc.), and a UDF-callback would make the code within streamz much simpler — all it does is execute the state publishing callback along with the Kafka commit/checkpoint callback.

Upon restart, you either just want the most recent state, or the state that accompanies the most recently completed checkpoint.

The most recently published state must be the one with the last committed Kafka checkpoint, or else the SDF and Kafka checkpoints will be out of sync.

Which you want depends on whether you want guaranteed processing or not - this should work without checkpointing too, if it's OK to loose some events in the aggregation.

I'm not sure I quite understand this.

jsmaupin commented 4 years ago

For tying this with the Kafka checkpoints, I think one thing that would really help is if we published the Kafka metadata with the data. I've thought for a while that by not giving the user these details, we are limiting them in what they can do. Kafka has other data such as:

If we added this to the records, I think that would get us closer to tying the SDF state to which Kafka offsets they're associated. I've seen this done before, where the Kafka consumer gathers all this information and passes it to the user. The user must then select the fields they want -- usually just the value, but in many cases people want some of the others. The Confluent library may already be downloading this data, so I wonder if it would be much overhead.

martindurant commented 4 years ago

A quick note:

otherwise the end-user would need to be aware of adding the publish logic

I don't think there's any getting away from it: the user needs to specify what publish means for their pipeline, and that would be true with some callback too. We can, of course, provide some convenience logic around this.

chinmaychandak commented 4 years ago

I don't think there's any getting away from it

That's absolutely true. I just meant with_state ought to do it — instead of passing the state downstream, which is what is currently happening, it just needs to add the state to the metadata instead.

martindurant commented 4 years ago

The Confluent library may already be downloading this data

Yes, I believe it is all there (although key, headers and partition are optional)

martindurant commented 4 years ago

with_state ought to do it

I am not convinced that the current internal state can be considered as part of the metadata, it feels separate; but I can see why you would want to reduce the number of output types.

chinmaychandak commented 4 years ago

with_state ought to do it

I am not convinced that the current internal state can be considered as part of the metadata, it feels separate; but I can see why you would want to reduce the number of output types.

Yes, I totally agree to this. Just saying that the "metadata" is a convenient way to tie the state with the Kafka checkpoint.

jsmaupin commented 4 years ago

At this point the from_kafka_batched handles the Kafka checkpoint. We would need to modify this function somehow to get them to tie together.

jsmaupin commented 4 years ago

yes, a list of callbacks on a refCounter would be fine, but I don't like callbacks, as they act outside of the processing stream/graph and so are harder to reason about; these callbacks now feel like downstream nodes...

@martindurant If we add this operation as a downstream node, then do we need the user to add this node in order for this to work or would we need Streamz to attach this node to the end of the pipeline using the graph walking concept you were talking about before?

martindurant commented 4 years ago

I think I favour the user explicitly adding a "publish state" node after the SDF aggregation node - it's a good thing to show on a stream graph.

jsmaupin commented 4 years ago

I was just about to comment various things I have tried.

I think I favour the user explicitly adding a "publish state" node after the SDF aggregation node - it's a good thing to show on a stream graph.

If we go this route, then it would make sense to publish (at least some) of the Kafka message metadata into the the Streamz metadata?

jsmaupin commented 4 years ago

So, that we can tie the published SDF with Kafka offsets.

martindurant commented 4 years ago

I agree, it would make sense like that. It can still hold references, though, so the callback route will still work too.

jsmaupin commented 4 years ago

Getting back to this. I think most everything makes sense. I'm just trying to figure out how we would attach a reference to the SDF in the metadata. One idea proposed was to do it in the accumulate function of the core part of Streamz and in the same function of the DaskStream. However, I believe this assumes that the user is always going to perform some sort of operation on the SDF. Am I wrong here?

chinmaychandak commented 4 years ago

However, I believe this assumes that the user is always going to perform some sort of operation on the SDF.

I think this should be a fair assumption. The only case I can think when no operation is being performed on the SDF would look something like

sdf = DataFrame(stream, example)
out = sdf.stream.some-other-nodes.sink()

in which we don't want to checkpoint the SDF anyways. But for other cases, I think the user needs to perform some operation on an SDF to complete the pipeline code.

jsmaupin commented 4 years ago

I think this should be a fair assumption.

I think the user needs to perform some operation on an SDF to complete the pipeline code.

I'd like to write up the code and send over a PR. I just want to make sure that no one and any objections here.

martindurant commented 4 years ago

I think this should be a fair assumption.

Yes agreed. I don't think there's a case for using the state publish/checkpoint ability of SDFs as a side effect in itself. That would be like a backdoor sink.

jsmaupin commented 4 years ago

We had a talk internally and came up with a design proposal. It would be great to get feedback from everyone, especially @martindurant and @CJ-Wright

SDF Checkpointing

For a streaming data pipeline to be durable, it needs to be checkpointed. Checkpointed data pipelines can resume from a known state after being stopped from either a planned or an unplanned event. SDFs hold data that is the result of accumulated processing over time. The proposal below involves

  1. Taking a snapshot of an SDF
  2. Saving the snapshot to durable storage
  3. Loading previously staved snapshot from durable storage when the pipeline starts into an SDF

This snapshot can be used to allow the SDF to continue a long running accumulation when the pipeline is restarted.

What is included

Currently in Streamz, the only thing that is checkpointed is Kafka offsets and partitions. This proposal will change that to include SDFs and a new format in order to save all the checkpoints in a single location.

Metadata

This is dependent on the source of the data. Each source will have a different way to track where it left off. For Kafka, this will be:

SDFs

SDFs contain stateful information generated by the pipeline. In order to restore the pipeline to the state that it was in before it was stopped, SDFs will need to be saved.

Interface

New parameters will need to be added:

How checkpoints are saved

Checkpoints will be saved into json files in a filesystem structure. Each new checkpoint saved will be saved with an incrementing file number known as the checkpoint ID.

The supported storage mechanisms for the checkpoint data will be:

  1. Local filesystem
  2. S3-like object store

File system structure

There will be one directly for the metadata and one directory for the SDF. Each new file will be named after the checkpoint ID. A rolling window of configurable size of checkpoint IDs will be saved where the oldest checkpoints will be deleted when the new ones are made. The default will be a size of 10 checkpoint IDs for a given pipeline. Additionally, there will be a directory to indicate that the previous commits were completed successfully.

This is used when loading the commits when starting the pipeline.

This commit file will save the pipeline configuration as changing the pipeline will make the checkpointing data saved incompatible with any new versions of the pipeline. This commit file will only be saved in the event that there is an SDF in the pipeline as it is only needed to ensure that all parts of the checkpointing have been committed. Streamz can already generate a networkx.DiGraph of the current pipeline. This networkx.DiGraph has several methods of writing this graph. Any of these methods can be used.

    checkpoint_path
    |
    |-- metadata
       |-- 1.json
       |-- 2.json
       |-- 3.json
       ...
    |-- data
       |-- 1.parquet or arrow?
       |-- 2.parquet
       |-- 3.parquet
       ...
    |-- commits
       |-- 1
       |-- 2
       |-- 3
       ...

JSON structure

Since the metadata is basically a Python dictionary, it is easily serialized to JSON. We can turn this into JSON and save into the directory path above.

    {
        "partitions": [
            {
                "topic": "<topic name>",
                "partition_id": <partition ID>,
                "offset": offset
            },
            ...
        ]
    }

SDF serialization

SDFs will need to be serialized. Currently, there's no method to serialize an SDF, but there are methods to serialize each dataframe. A method will need to be added that will run map_partitions with the serialization method over each frame. Currently, it seems to make the most sense to use the to_parquet, which should be present on "dataframe-like" objects due to its smaller size -- which will result in less transfer time over the network.

How checkpoints are loaded

Each source that serializes metadata will need to load the same metadata when it is started. The latest commit ID will be determined by:

  1. Listing all the files in the commit directory
  2. Sorting the file names numerically
  3. Using the largest filename

Given that this operation will be performed in the base Source node, that node will have access to the commit ID and be able to retrieve the JSON document for the metadata.

For the SDF, the best way I can currently think of how to signal it to load the checkpointed data is to pass the checkpointing items into the constructor. This means that we will need to pass in this info in two locations.

Alternatively, we can pass the checkpointing location info down the pipeline when nodes are connected.

If the commits directory exists in durable storage, then a check will be performed to ensure that the pipeline does not have any unexpected changes since the last time it was run. If an aggregation is changed in the pipeline, then it may yield the aggregation results as meaningless or result in an error. The current pipeline aggregation operations will be compared to the data that is stored in the commit file. If the pipeline differs from what is saved in the commit file, an error will be thrown describing the differences. It will be up to the user to do one of the following:

  1. Delete the existing checkpoint data
  2. Use a new checkpoint path
CJ-Wright commented 4 years ago

Thank you very much for putting together this proposal!

I have a few off the cuff thoughts, but I'll most likely need to re-read and sleep on this so that I can think through the implications.

  1. Would it be possible to generalize this beyond SDFs? I can imagine a bunch of non SDF based approaches that would benefit from this work.
  2. How much of the file format and associated moving parts can be independent of streamz? Does the API need to be enmeshed or can streamz nodes call out to existing APIs?
  3. What are some of the scenarios where this would kick in? If a server gets cut off and restarts how does the pipeline know where its files are?
jsmaupin commented 4 years ago

Would it be possible to generalize this beyond SDFs? I can imagine a bunch of non SDF based approaches that would benefit from this work.

You mean items other than SDFs? I think we could definitely generalize to any part of the Streamz pipeline. We could have a path that would point to the location where the checkpointing can be saved. That path would belong to the entire pipeline. Any node in the pipeline can use this path to perform checkpointing.

How much of the file format and associated moving parts can be independent of streamz? Does the API need to be enmeshed or can streamz nodes call out to existing APIs?

I can potentially be separated from Streamz. I know other systems, such as Kafka have their own APIs for checkpointing. Are you thinking of something like ZooKeeper? I know there are good reasons why Kafka has moved away from ZooKeeper as the location to which it saves its checkpoints. Kafka now does this internally as one of its topics.

What are some of the scenarios where this would kick in? If a server gets cut off and restarts how does the pipeline know where its files are?

  1. Dask or an part of Streamz causes the Python VM to exit early
  2. Networking issues
  3. The OS becomes unstable
  4. The user needs to update something or restart the job

Really, it's protection against anything that can go wrong.

CJ-Wright commented 4 years ago

You mean items other than SDFs?

Yes, this seems like a generic property of a pipeline. I think folks using stremz for image/x-ray data could be interested in this @danielballan

Are you thinking of something like ZooKeeper?

I wasn't thinking of any particular implementation more that the checkpointing thing seems like something streamz depends on rather than something closely enmeshed. At an implementation level, if the file system and such were to be part of streamz I would hope we could have it as part of and independent module with a robust API rather than having the internals spread over the nodes themselves.

Really, it's protection against anything that can go wrong.

In those cases how does the pipeline know which files to talk to? Is that hard coded into the pipeline, in the env?

martindurant commented 4 years ago

this seems like a generic property of a pipeline

any node that has internal state should be able to save this state, and be automatically restarted with that state. It could be a fitted model, whatever...

the checkpointing thing seems like something streamz depends on

Totally agree, I imagine being able to pass a function or class to handle the loading and saving of state. We would maybe include a couple of examples for something like:

how does the pipeline know which files to talk to

We already can extract the interior state and metadata from a node which accumulates, so it would amount to adding a downstream node which saves this, so such a class would have the filename or whatever as input kwargs; and the original node would need to also access the same thing at instantiation to set state, if appropriate.

By the way, please don't just support local files and S3, but use fsspec to talk to arbitrary backends. Given than fsspec now has a configuration system, you might not need to pass any credentials or other kwargs directly to the saving function - but passing would be fine too. fsspec and dependent libraries (dask, pandas, intake, zarr...) typically call the kwargs used to set up the filesystem as storage_options. I would find it natural to want to save checkpoints with paths containing the current time as ISO.

jsmaupin commented 4 years ago

At an implementation level, if the file system and such were to be part of streamz I would hope we could have it as part of and independent module with a robust API rather than having the internals spread over the nodes themselves.

I was hoping there could be a way where we could centralize all the code related to this in one or only a few locations. At that location, it would talk to the service (or file system) that would save the checkpointed data and metadata.

In those cases how does the pipeline know which files to talk to? Is that hard coded into the pipeline, in the env?

The user of the library would supply the path to Streamz.

jsmaupin commented 4 years ago

We already can extract the interior state and metadata from a node which accumulates, so it would amount to adding a downstream node which saves this.

I agree with this, but I'm wondering what you're proposing here. Are you suggesting that:

  1. A node that accumulates place the metadata that it wants to checkpoint into the metadata variable that is passed downstream or
  2. The downstream does a "crawl" upstream to find nodes that have metadata that needs to be saved

use fsspec to talk to arbitrary backends

This is fantastic! Thanks for pointing me here.

martindurant commented 4 years ago

Certainly option 1. We don't really want stream nodes to reach into their own graphs if we can help it.

jsmaupin commented 3 years ago

I've been getting back to this. I think I have something that works. I would like to push a PR in the next few days. I think there are two main places where we would want to checkpoint the SDF. I think this includes all the possible operations. Maybe someone here knows of another location.

  1. When an aggregation happens. Everything goes through the accumulate_partitions and the accumulate core function
  2. When a non-aggregation is applied to the SDF. This goes through map_partitions. The other problem is how to configure where the checkpoints are saved. In my test code, an S3 path is passed to the from_kafka_batch function and sent down through the metadata. If the configuration is found in the metadata while running the map_partitions or accumulate_partitions, then the SDF is checkpointed.