ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.53k stars 5.69k forks source link

Discussion on batch Garbage Collection. #2242

Closed guoyuhong closed 4 years ago

guoyuhong commented 6 years ago

Hi @robertnishihara @pcmoritz , we are planning to add a batch Garbage Collection to Ray.

We have a concept called batchId (int64_t) used to do the Garbage Collection. For example, one job will use this batchId to generate all the objectIds and taskIds, and all these objectIds and taskIds will be stored under the Garbage Collection Table under the batchId in GCS. When the job is finished, we can simply pass a batchId to the garbage collector and the garbage collector will look up the Garbage Collection table in GCS and do the garbage collection to all the related tasks and objects.

In current id.h implementation, the lowest 32 bits in ObjectId is used for Object Index. We can use the higher 64 bits next to the Object Index as the batchId and add a new GC Table in GCS.

This GC mechanism will help release the memory resources in GCS and plasma. How do you think of this code change?

istoica commented 6 years ago

I think the Batch PRD is converging nicely. Any more requirements from the AI team or any other team from Ant (or Alibaba)?

Also, note that in the future we can use the batch abstraction to implement (1) data parallel operations, (1) a parallel SGD iteration, or maybe the full SGD, and (3) maybe support affinity or anti-affinity location-aware placement.

eric-jj commented 6 years ago

For the Batch PRD most requirement from us. For the job management PRD, we are still working with AI team on it. Expect we can make it ready today.

eric-jj commented 6 years ago

@istoica We complete the first version PRD for the Job submit. Please review and provide comments, thanks.

istoica commented 6 years ago

@eric-jj, it looks great! I left a few quick comments.

istoica commented 6 years ago

@eric-jj et al, there are a few relative minor comments in the Batch PRD. Otherwise we went again over the two PRDs today in our weekly meeting, and we are fine with the current PRDs.

imzhenyu commented 6 years ago

@istoica @eric-jj we need also take into consideration the impact to FO for this batch approach. Considering the case where an actor is invoked by several interleaved batches, how we will handle the FO when one of the batch is garbaged collected (including the lineage information). This is challenging, as we've come up one solution before but never got it into production due to its complication and high overhead. I guess it is best that we can think it through before we really implement it.

istoica commented 6 years ago

@imzhenyu good point!

We discussed about this a bit here, but didn't add to the document. For actors declared outside the scope of a batch, we probably want to checkpoint their internal state as part of the cleanup. Also, if we whitelist some objects we might want to push these objects to persistent store, so we don't need to reconstruct them after the batch completes. Alternatively, we can checkpoint the out-of-scope actors when we enter the batch, and then replay the batch if we need some outputs that were created by that batch.

Anyway, I'll add some text about this in the document.

imzhenyu commented 6 years ago

@istoica Checkpointing is indeed one way to mitigate this problem. The challenge is when there are too many interleaves that causes much checkpointing overhead. Another approach is to delay the garbage collection. @yuyiming can share more information about what we did before trying to address this problem, though that approach also has its own drawbacks. Hopefully, we may be able to combine the strength of both approaches.

istoica commented 6 years ago

@imzhenyu @eric-jj I updated the document added more precise description of the batch behavior, including some fault tolerance considerations.

Regarding @ericl' s request to be able to destroy all the state associated to the batch explicitly, here is a possible solution that keeps things simple for the typical users, and makes things possible for the sophisticated users, e.g., library developers:

The default API is unchanged:

batch = ray.batch(....)
with batch:
  result = f.remote()
  ray.do_not_free(result)

When the code in the batch's scope terminates, everything will be cleaned up except for result.

In addition, we can provide the library writers with an additional API:

batch.delete()

which will delete result as well.

If the user specifies no_cleanup=True in the batch declaration, then no state is cleaned up after with completes. In this case, batch.delete() will destroy everything.

Comments?

istoica commented 6 years ago

@yuyiming @imzhenyu can you please give an example "when there are too many interleaves that causes much checkpointing overhead"? Is this streaming? A pseudocode would be great, if possible. Thanks.

yuyiming commented 6 years ago

@istoica For example, if there is such a task executing pattern in an Actor: T1 (batch 1) -> S1 -> T2 (batch 2) -> S2 -> T3 (batch 1) -> S3 -> T4 (batch 2) -> S4 -> T5 (batch 1) -> S5 -> T6 (batch 2) -> S6 -> ..., where T for Actor tasks and S for Actor states after executing the corresponding tasks.

When batch 1 ends and we want to delete T1, T3 and T5, we must checkpoint S1, S3 and S5, because it is possible to re-execute T2, T4 and T6 in the future.

Later, I would explain my scheme to address this issue. Give me some time to make the explanation neater. :)

yuyiming commented 6 years ago

@istoica @imzhenyu I would explain my scheme in this comment.

For task execution chain in an Actor:

T1 -> S1 -> T2 -> S2 -> ... -> T_m -> S_m -> ... -> T_n -> S_n (checkpoint) -> ...,

we use previous and next to describe the order of these tasks and states. For instance, T1 is previous to T_m, and S2 is the next state of T2.

My scheme is based on the fact that given a task T_m and the first checkpoint S_n next to it, if all the tasks between T_m (included) and S_n are marked as GCed (i.e. all of there batches have ended), T_m would never be re-executed due to lineage reconstruction along this task execution chain, but it may be re-executed due to lineage reconstruction along task execution chain of another Actor.

My scheme is described below:

istoica commented 6 years ago

All, regarding the API, I had a longer discussion with @ericl and here is his proposal, I agree with.

User API (this is pretty much the one in the Batch API document):

with ray.batch() as b:
    do stuff
    b.return(obj_result)
    # All the state created by b in GCS and object store (except obj_result) 
    # is cleaned when b completes.
    # b.return(obj_result) transfers the ownership of obj_result to parent batch.

Developer API; this is to provide more flexibility to hard core developers such as library developers:

static def new_batch_id()
static def set_current_batch_id(batch_id)
static def get_current_batch_id()
static def destroy_batch(batch_id)
static def transfer_ownership(obj, batch_id)

The user API is syntactic sugar for:

try:
    prev_id = get_current_batch_id()
    new_id = new_batch_id()
    set_current_batch_id(new_id)
    do stuff
    transfer_ownership(obj_result, prev_id)
finally:
    set_current_batch_id(prev_id)
    destroy_batch(new_id)

Please let us know if you have any comments. If not I propose we update the Batch PRD accordingly. Thanks.

imzhenyu commented 6 years ago

Regarding to the case where interleaving occurs a lot, considering you are building your own scheduler atop of Ray, e.g., for large amount of concurrent small graph jobs and online mr s.

imzhenyu commented 6 years ago

The current API are mostly synchronous, which means we meed quite some threads for given concurrent small jobs (each as a batch). This is in particularly difficult without user level threading (which is challenging due to hybrid language runtime in the driver right now). One alternative is to use asynchronous API, which is however less easy for adoption. Probably need more discussion in the future for this.

istoica commented 6 years ago

@imzhenyu actually, I think the current API is asynchronous. The two batches below should execute in parallel. Or maybe you meant something else by synchronous?

with ray.batch() as b1:
    result1 = f.remote()
    b1.return(result1)

with ray.batch() as b2:
    result2 = f.remote()
    b2.return(result2)

results = ray.get(result1, result2)
istoica commented 6 years ago

@istoica @imzhenyu regarding the interleaved batches, maybe we could do this using the low level API, and by adding a new flag (e.g., checkpoint=False) to destroy_batch() indicating that no checkpoint should be done when batch's state is cleaned up.

Consider your example T1 -> S1 -> T2 -> S2 -> ... -> T_m -> S_m -> ... -> T_n -> S_n (checkpoint) -> ..., Then create a batch for each Ti and do the following:

@ray.remote
class foo():
    class declaration

A = foo.remote()    

batch_stack = []

t1 = new ray.batch()
batch_stack.append(t1)
do stuff on actor A

t2 = new ray.batch()
batch_stack.append(t2)
do stuff on actor A

...

tn = new ray.batch()
batch_stack.append(tn)
do stuff on actor A

# checkpoint and clean up
t = batch_stack.pop()
ray.destroy_batch(t) # clean & checkpoint state of actor A
while len(batch_stack) > 0:
    t = batch_stack.pop()
    ray.destroy_batch(t, checkpoint=False) # clean & don't checkpoint state of A

Would this satisfy your requirements?

imzhenyu commented 6 years ago

@istoica my bad for not explaining the concern about the sync/async APIs clearly. Indeed, the API itself is asynchronous, and we can get the results later as futures (illustrated in your example already). The concern is about the synchronous thinking style this API suggests, because we need to get the symbolic results of the batch immediately in the with block. This is reasonable for machine learning jobs in most cases I believe, as we use a controller to get things ready. However, considering the streaming scenarios, we may simply want to start the batch here and end the batch somewhere else (so called asynchronous:-)). Our current implementation in production adopts the later approach, where an endBatch API is given and developers can call endBatch in a worker that is not the driver. Then I just have a second look of your low level API, and probably the problem is already gone as we can have another syntactic sugar to cover the above case using these low API.

imzhenyu commented 6 years ago

@istoica regarding to your solution to the interleaved batch, I'll discuss with @yuyiming etc. in Ant tomorrow and get back to you asap. Thx.

yuyiming commented 6 years ago

@istoica Sorry for my obscure expression. I would give a pseudocode for the scenario:

@ray.remote
class Counter(object):
    def __init__(self):
        self.counter = 0

    def incAndGet(self):
        self.counter += 1
        return self.counter

@ray.remote
def other_func(r):
    # do stuff. For example:
    print(r)

@ray.remote
def batch_func(counter):
    b = ray.batch()
    r1 = counter.incAndGet.remote()
    other_func.remote(r1)
    time.sleep(10)
    r2 = counter.incAndGet.remote()
    other_func.remote(r2)
    time.sleep(10)
    r3 = counter.incAndGet.remote()
    other_func.remote(r3)
    time.sleep(10)
    b.return(None) # destroy_batch

g_counter = Counter.remote()

batch_func.remote(g_counter) # batch 1
time.sleep(5)
batch_func.remote(g_counter) # batch 2

This code might generate a task execution chain in Actor g_counter:

# T1.1 -> S1 -> T2.1 -> S2 -> T1.2 -> S3 -> T2.2 -> S4 -> T1.3 -> S5 -> T2.3 -> S6 -> ...
# |             |             |             |             |             |
# v             v             v             v             v             v
# R1.1          R2.1          R1.2          R2.2          R1.3          R2.3
# |             |             |             |             |             |
# v             v             v             v             v             v
# ...           ...           ...           ...           ...           ...

where Ti.j refers to the jth task in batch i, R refers to the return value of corresponding task, and S refers to some Actor states.

When batch 1 returns, if we want to delete task info of this batch in GCS, we must checkpoint S1, S3 and S5, because T2.1, T2.2 or T2.3 might be re-executed for the reconstruction of R2.1, R2.2 or R2.3 in the future. This is what @imzhenyu said: interleaves cause much checkpointing overhead.


In my scheme, Ray framework need not to checkpoint automatically when a batch ends. Checkpointing could be done:

  1. periodically as in Ray 0.4.0.
  2. manually by users (in Ant). Users could call an API ray.checkpoint() which tells Ray to checkpoint the Actor state after the caller returned. Checkpointing could be done in any Actor task.

So, in my example:

T1 -> S1 -> T2 -> S2 -> ... -> T_m -> S_m -> ... -> T_n -> S_n (checkpoint) -> ...

I mean the checkpointing on S_n is done by the checkpointing algorithm such as in Ray 0.4.0. Besides, Each T_i might belong to any batches.

Please think about my scheme under these explanation. Thanks a lot.

My Scheme

istoica commented 6 years ago

@yuyiming thanks for the example. Quick question: when does a batch end or is destroyed in your case, i.e., when are a batch resources cleaned up?

yuyiming commented 6 years ago

@istoica b.return(None) in my code implies that the batch ends

istoica commented 6 years ago

@yuyiming thanks, got it. So the key problem here is having two batches that invoke methods on the same actor, and having these invocations interleaving. I think I understand your solution now. Basically, this kind of interleave introduces dependencies between batches. You cannot just clean the state of one batch since the other batch might depend on the actor state updated by the first batch.

yuyiming commented 6 years ago

@istoica Yes, interleaving is a complicated problem.

My solution could be summarized as follows: When a batch ends, we do not delete the task and object info in GCS immediately, instead we wait for some specific conditions indicating that the batch info could be deleted safely.

guoyuhong commented 6 years ago

@yuyiming Hi, Yiming. In your case, you mentioned that we may need to checkpoint S1, S3 and S5, because T2.1, T2.2 or T2.3 might be re-executed for the reconstruction of R2.1, R2.2 or R2.3 in the future. By our API design, we could add R2.1, R2.2 and R2.3 to the GC whitelist which will not be cleaned. Will this whitelist help to avoid reconstruction of R2.1, R2.2 or R2.3 and hence avoid the checkpoint of S1, S3?

yuyiming commented 6 years ago

@guoyuhong Hi, yuhong. Regarding the highly interleaved batches, it is also too consuming to add so many return values to GC whitelist and to persistently store them.

Furthermore, it might be better to combine these two schemes. For the scenario in which interleaving rarely occurs, checkpointing in boundary has a virtue of simplicity, otherwise delayed GC (my solution) would save much checkpointing and data-persistence expenditure.

istoica commented 6 years ago

@yuyiming (@guoyuhong, @imzhenyu) just to make sure we are on the same page, here is one way to implement your scheme. Please let me know if this looks good to it.

First we say that batch b1 depends on batch b2 if a task of b1 uses a state or result produced by b2 before b2 completes.

We add to each batch has two flags:

In addition, each batch contains two lists:

Then the cleanup algorithm is something like this:

# called when batch is done or when a dependent batch is cleaned up
cleanup(b):
  if (status = done) && (dependency = clean):
     checkpoint every return result and the state of every actor that batch b had modified;
     clean up all GCS state and any other objects created by batch b;
     for  every batch b1 in depends_on_list:
         b1.dependent_list.remove(b) # we just cleaned b so b1 has no longer b as dependent
         clean(b1)

I'll add this pseudocode to the document later today so we can refine it.

imzhenyu commented 6 years ago

Seems good to me. While this is more general, one potential problem is whether there can be cycles in this batch level dependency graph, because batches are not atomic and the dependencies can be interleaved in the middle.

yuyiming commented 6 years ago

Looks good to me. The virtue is that it handles dependency in batch level. I have some comments:

istoica commented 6 years ago

@yuyiming and @imzhenyu, thanks for the comments. I added another cleanup proposal which should work for arbitrarily interleaved batches. See "Garbage collection for interleaving batches" here: https://docs.google.com/document/d/1sMgVMajomF5j-GDkzhuejrAwLcxXNKY9lTmlDo7GkqM/edit?ts=5b360317#.

In a nutshell here is the scheme:

  1. Create a dependency graph across batches.
  2. Identify connected components in this graph.
  3. When a batch finishes, checkpoint all its return values and the state of every external actor it has modified.
  4. Once all batches in a connected component have been checkpointed, clean up all of them.

I suggest that once we are comfortable with this or another scheme, we are starting the implementation. Thanks.

yuyiming commented 6 years ago

@istoica My concern is how to checkpoint the state of every Actor a finished batch has modified. For the below timeline:

# Actor 1 -----S1.1----------------S1.2------------ -> timeline
#           ^                   ^
#           | T1.1              | T1.3 (finish)
# Batch 1 ----------------------------------------- -> timeline
#               | T1.2
#               v
# Actor 2 ---------S2.1----S2.2-------------------- -> timeline
#                       ^         (Actor 2 is not on the boundary
#                       | T2.1     of Batch 1 when Batch 1 finishes)
# Batch 2 ----------------------------------------- -> timeline

When Batch 1 is done, it can NOT checkpoint the state of Actor 2 on its boundary (S2.1), but Actor 2 is on the state S2.2.

istoica commented 6 years ago

@yuyiming Not sure I see the problem here. First, in this case, we will do cleanup only after Batch 2 finishes as well. Before then we have all the GCS lineage so we can reconstruct all the objects and actor states, if needed.

Second, even if Batch 1 finishes and we clean up its state right away things should still work in this case. Say, Batch 1 finishes, its state is cleaned up (i.e., actor states S1.2 and S2.1 are checkpointed), and right after this we have a failure that kills Actor 2. In this case Batch 2 can reconstruct the state of Actor 2, S2.2., by starting from checkpoint S2.1 and then replaying T2.1.

Also, yesterday we did discuss a simple solution during a Ray meeting that only cleans up the object store. In a nutshell:

istoica commented 6 years ago

All, we have had some internal discussions at Berkeley of whether the batch API should be asynchronous or synchronous:

In the async case

with ray.batch() as mybatch:
  a = f1.remote()
  b = f2.remote()
  c = f3.remote(a, b)

fun4.remote()

from the user's viewpoint the batch "finishes" as soon as f3.remote(a, b') is submitted, and hence fun4.remote() can finish before f3.remote(a, b'). (From the system perspective however the batch always completes only after the last function in the batch finishes, i.e., after f3.remote(a, b') finishes.)

In the sync case, fun4.remote() is submitted only after f3.remote(a, b') finishes.

Also note that in the asynchronous case, one can still force synchronous execution by using ray.wait(), e.g.,

with ray.batch() as mybatch:
  a = f1.remote()
  b = f2.remote()
  c = ray.wait(f3.remote(a, b))

fun4.remote()

Any comments give your use cases?

imzhenyu commented 6 years ago

@istoica The problem with checkpointing actor state on batch completion is that you don't know a given task is the last actor task in a batch before hand, because other tasks may further emit more actor tasks, unless with certain restricted API design (e.g., static data flow as we can do the mark before hand). That's why @yuyiming said that it is difficult to do the checkpoint when a batch completes for the actors as their state are polluted by others already.

Regarding to the async/sync API design, I think both approaches you suggest are fine, and users should decide which one to use. However, my concern is about which task is considered the batch completion task in the API invocations. In a sync design, the last remote call in the with block is considered the one. While in an async design, the last remote call is merely considered a common task in the batch, and one further task (directly or indirectly) triggered in these tasks within the with block is the completion task. IMHO, the API design in the latter case fits better in the streaming case while the API design in the former case may work in many other scenarios where the driver totally commands almost everything.

istoica commented 6 years ago

@imzhenyu When you are saying "other tasks may further emit more actor tasks" are you referring to something like the following code, where parent() calls child() which invokes a method on actor a?

@ray.remote
fun child(a, b):
   a.method.remote(b)
   ...

@ray.remote
fun parent(a):
  child.remote(a, b)

a = Actor.remote()
width ray.batch():
  ...
  parent.remote(a)

2) Regarding async vs sync, just to make sure: you are saying asyn API works better for streaming, right? How hard would be to provide the same functionality with a sync API?

istoica commented 6 years ago

@imzhenyu and @yuyiming another quick question. Would it be ok to have only one add_return() statement, and for that statement to be the last one in the batch? If not, can you give an example of functionality you cannot implement assuming this limitation?

yuyiming commented 6 years ago

@istoica For this pattern:

# Actor 1 -----S1.1----------------S1.2---------|--- -> timeline
#           ^                   ^               |
#           | T1.1              | T1.3 (finish) |
# Batch 1 --------------------------------------|--- -> timeline
#               | T1.2                          |
#               v                               |
# Actor 2 ---------S2.1----S2.2-----------------|--- -> timeline
#                       ^                       |
#                       | T2.1                  |
# Batch 2 --------------------------------------|--- -> timeline
#                                               |
#                                    (cleanup moment of batch 1)

S2.1 can NOT be checkpointed when batch 1 finishes, because at this moment Actor 2 is on the state S2.2. Although we can reconstruct the Actor state S2.1 by re-executing tasks, I think this is too consuming: It needs to re-run almost all the tasks to checkpoint all the boundary Actor states under highly interleaved batches.

On the other words, when T1.2 finishes, we do not know it is the last task of batch 1 in Actor 2, thus we do not have enough information to decide whether to checkpoint this Actor at this moment.

If I misunderstand the scenario, please point it out. Thanks a lot.

imzhenyu commented 6 years ago

@istoica regarding to the previous 2 questions about sync/async. (1). "other tasks may further emit more actor tasks" Yes, it is correct. (2). Yes, async API is better for streaming, IMHO. Implementing that with sync API is doable I guess, but it will introduce unnecessary delays and complications, because streaming is more similar to one way RPC, where the upstream nodes do not care much about what the downstream nodes will do. With a add_return statement within the batch, we can implement the sync API but not the async ones. One possible way is to have a ray.end_batch API that can be invoked anywhere within the batch.

imzhenyu commented 6 years ago

@istoica My previous statement about unnecessary delay may be wrong, as it is possible to emit the result to the environment before we end a batch (though extra notifications are needed to notify the driver after emitting results). Then I guess we may limit the API design to this sync version, which is simpler to use (though less flexible but good enough already for many cases) and easier to implement.

istoica commented 6 years ago

@imzhenyu can you provide an example of why we need multiple return statements in the same batch? Can't we emulate the behavior by having one batch per return? If we allow multiple return statements and the batch is async, it could be difficult to figure out when we can clean up the object store. Also the semantics of a batch with multiple return statements seems more complex/confusing.

istoica commented 6 years ago

Basically, I'm thinking of something like this where batch.end takes as argument a list of objects to return:

x = fun1.remote()
batch = ray.batch()
y = fun2.remote(a)
z = fun3.remote(c)
batch.end([z])  # change ownership of `z` to parent batch

Upon finishing the batch, the above code will clean up y but not z (as it was returned by the group) and x (as it was declared outside group's scope).

And, here is the with version which is virtually identical:

x = fun1.remote()
with ray.batch() as batch:
  y = fun2.remote(a)
  z = fun3.remote(c)
  batch.return([z])
istoica commented 6 years ago

@imzhenyu BTW, in the above example garbage collection happens only after the batch ends. Nothing will be garbage collected before then. So as long as you add an object to the return list, you can safely use that object elsewhere as soon as it is created as you know it won't be garbage collected during or after the batch ends (no need to wait for return -- probably need to change the name again to indicate that you do not need to wait for return to use an object).

In the above example, say you do a lot of stuff after creating z. You can still use z safely before the batch ends, as none of the objects will be garbage collected before then.

Wouldn't this be enough for your cases?

istoica commented 6 years ago

@imzhenyu Maybe rename return to keep, i.e., batch.keep(object_list, actor_list) ?

istoica commented 6 years ago

@yuyiming and @zhijunfu Thanks for your example illustrating that it is hard to checkpoint actors on the batch frontier. However, the proposal I added to the document should still work correctly. Is the a problem you have in mind an efficiency or correctness problem?

Anyway, I was thinking about the following alternative that will make the above discussion moot: how about checkpointing the actors at the beginning of the batch instead of at the end? As long as the actors are checkpointed frequently enough it shouldn't matter whether we do it at the beginning or the end. In a streaming example, just think of the next batch checkpointing the state of the current batch. This would be much easier to implement and it will have the added benefit to enable rollback, e.g., in an SGD iteration you can rollback to the initial state of an iteration if an actor fails, which is what you want.

imzhenyu commented 6 years ago

@istoica Agree that each batch should only have one return statement (when I say anywhere for ray.end_bath I should say anywhere but only once:-)). That's also one subtle issue regarding the async design, as developers should guarantee this invariant.

As for the return(z) design, I think it is great, though we still need to figure out a way to garbage collect z in the end, esp. for online computing services. Currently in our production scenario, we don't have z returned but sent to remote services directly in the end batch task.

istoica commented 6 years ago

@imzhenyu can you please give an example showing this pattern, i.e., (1) needing to have a return statement that is not the last statement in the batch, and (2) having to garbage collect z at a latter point ? For (2) maybe we can use nested batches, so z is garbage collected in the outer batch. For instance consider the following code snippet:

with ray.batch() as outer:
  x = fun1.remote()
  with ray.batch() as inner:
    y = fun2.remote(a)
    z = fun3.remote(c)
    inner.return([z]) # move z to outer's scope
  # inner done; z is not garbage collected since it is in outer's scope
  do something more in inner's scope
# outer done; z is garbage collected
imzhenyu commented 6 years ago

@istoica I don't have an immediate example for both. But for (1), considering a streaming example where the initial points only calls a function without a return value (as it is the ingest nodes), and the batch ends in a later stage deployed in another actor. For (2), considering caching the results for the computation requests, then you need to collect z a later point.

istoica commented 6 years ago

@imzhenyu Thanks. Correct me if I'm wrong but for (1) I think that you are saying there is no return statement, right?

Also, for (2) you are basically saying that you want to garbage collect an object outside the initial with statement where it was created, e.g., imagine a queue where z is created by an actor method (e.g., enqueue()), and you want to garbage collect after another method, dequeue(), removes it. Correct?

yuyiming commented 6 years ago

@istoica I think one solution is to checkpoint Actor states on all batch transition points (batch boundaries). For instance, When T2.1 is received by Actor 2, S2.1 should be checkpointed before the execution of T2.1 because it is generated by another batch (T1.2). We say that S2.1 is a batch transition point from batch 1 (T1.2) to batch 2 (T2.1).

imzhenyu commented 6 years ago

@istoica yes, there is no returned value or the batch returns empty results. (2) is correct.