apache / incubator-heron

Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter
https://heron.apache.org/
Apache License 2.0
3.64k stars 597 forks source link

New stateful operator supporting live scaling #1499

Open wangli1426 opened 8 years ago

wangli1426 commented 8 years ago

I am writing to propose a new operator, called ElasticStatefulOperator. The operator organizes user state as key-value pairs. Operator scaling, e.g., increasing/decreasing the number of tasks, can be achieved by re-partitioning the keys among the tasks and updating the routing in StreamManager accordingly. With this feature, the parallelism of ElasticStatefulOperators can be adjusted either by the user via heron update command or by the auto-scaling algorithm, without deactivating current topology. We have implemented this operator based on Apache storm (Source code). Our experiments show that operator scaling can be achieved by this operator in milliseconds with state consistency.

The assumption under this operator is that (1) each input tuple is associated with a key; and (2) an input tuple reads/updates the state on a key basis. This assumption hold for a large variety of stateful operators, including equal join, count. To manager the operator state in user code, the ElasticStatefulOperator models operator state as key-value pairs, where keys and values can be of any data types. The operator exposes getStateByKey() and setStateByKey() interfaces for state read/update in the user code. The implmentation of ElasticStatefulOperator looks like:

public abstract class ElasticStatefulOperator implements Serializable {

    private KeyValueState state = new KeyValueState();

    public abstract void prepare(Map stormConf, TopologyContext context);

    public abstract void execute(Tuple input);    

    public Serializable getValueByKey(Serializable key) {
        return state.getValueByKey(key);
    }

    public void setValueByKey(Serializable key, Serializable value) {
        state.setValueByKey(key, value);
    }

}

A WordCount implementation based on this new operator would be like this:

class ElasticWordCount extends ElasticStatefulOperator {

    void prepare(...) {...};

    void execute(Tuple tuple)
        String word = tuple.getStringByField("word");
        Long count = getStateByKey(word);
        if(count == null) {
            count = new Long(0);
        }
        setStateByKey(word, count + 1);
    }
}

Modifications to existing code:

  1. New functions will be added toStreamManager to support routing updates;
  2. ElasticStatefulOperator will be added;
  3. Scaling delegate will be added to coordinate the state migration and the routing updates.

Any feedback is highly appreciated.

kramasamy commented 8 years ago

@billonahill @maosongfu @avflor - when you get some time, can you provide feedback on the proposal?

avflor commented 8 years ago

@wangli1426 I think there are two things to consider: 1) Re-routing of keys in case we add/remove instances and 2) State migration of Bolts to newly started bolts.

I think your proposal refers to the 2nd one, right? I believe the 1st one we can be implemented in the grouping methods with a smart hash function and not in user code. If your proposal refers to the 2nd method, then do you assume that the state migration will be done by the stream manager? I see that you mention this component but I'm not sure If I understood correctly.

wangli1426 commented 8 years ago

Hi @avflor ,

Thanks for your response.

Actually, my proposal refers to both 1) and 2). The second one makes it possible for the system to migrate a subset of state among Bolt Instances. And the first makes sure the tuples are sent to the right Bolt Instances after state migration.

For 1), you are right. Re-routing can be done in the StreamManager by modifying the key to tasks mapping.

For 2), as data flows among HeronInstances are via stream mananger, it is reasonable to migrate the state by stream manager.

avflor commented 8 years ago

@wangli1426 I think it is not that simple to do state migration through the stream manager. I believe we need a central external unit to do the state migration as we will avoid many coordination problems between stream managers. There are multiple things that need to be synchronized: we need to make sure tuples are not routed to the original bolts while migrating the state of the bolts, we need to know when all state migrations have been completed so that processing is resumed, we need to handle failures (e.g, a new bolt failed while receiving state). I think a central external component that performs the coordination, failure handling etc might be a better solution that introducing this functionality to the stream manager. Moreover, I'm not sure how we will make the heron instance understand that it is not receiving regular data tuples through the stream manager but some other bolt's state. But apart from that I think a central state migration is a more robust solution. @billonahill @kramasamy @maosongfu may have more input on this.

wangli1426 commented 8 years ago

@avflor

Thanks for your comments.

I think it is not that simple to do state migration through the stream manager. I believe we need a central external unit to do the state migration as we will avoid many coordination problems between stream managers.

I agree with you. Stream manager alone is not sufficient. As stated in my proposal, we need a centralized scaling delegate to coordinate the state migration and the rerouting. For instance, before a key is migrated from a HeronInstance to another, we need to make sure all the tuples sent to the HeronInstance has been completed processed.

There are multiple things that need to be synchronized: we need to make sure tuples are not routed to the original bolts while migrating the state of the bolts, we need to know when all state migrations have been completed so that processing is resumed, we need to handle failures (e.g, a new bolt failed while receiving state).

Yes, to migrate a set of key, say k from HeronInstance A to HeronInstance B, we need the following steps. 1) stop sending tuples with keys in k to A; 2) make sure all tuples with keys in k sent to A has been processed; 3) migrate the involved state from A to B; 4) resume sending tuples with keys in key to B.

Moreover, I'm not sure how we will make the heron instance understand that it is not receiving regular data tuples through the stream manager but some other bolt's state.

We can add additional fields to HeronTuples.HeronDataTuple to indicate whether a tuple is a regular tuple or control tuple, and process the tuples properly at BoltInstance.java:216 like this:

      for (HeronTuples.HeronDataTuple dataTuple : tuples.getData().getTuplesList()) {
        if(dataTuple.type == ControlTuple) {
             //do the control work such as state migration, state merging.
        } else
            handleDataTuple(dataTuple, topologyContext, stream);
      }
billonahill commented 8 years ago

Agreed, we should assume some external state management. Stream manager should just be used to control routing logic.

Q1.) Regarding moving existing state when increasing parallelism, in the above flow how exactly do you determine which keys k need to move? Would we have to run all current keys in A through the updated hashing algorithm or is there a better way? Q2.) Related to Q1 but your example shows how to migrate state from A -> B, but when adding new instances we'd be going from Set_n -> Set_n + Set_m. Would we need to inspect keys on all Set_n to see what needs to move to Set_m? Q3.) Re implementation, could this be done with composition instead of inheritance? This would provide more flexibility to the topology author. So instead of extending the operator the bolt could implement StatefulComponent which could cause a configured implementation of a StateStore to be injected by the framework, or something like that.

Also we might want to avoid terminology specific to scaling (i.e., elastic) since this functionality wouldn't be limited to just scaling. It might be used when a faulty node gets restarted for example.

wangli1426 commented 8 years ago

@billonahill

Thanks for the comments.

Q1.) Regarding moving existing state when increasing parallelism, in the above flow how exactly do you determine which keys k need to move? Would we have to run all current keys in A through the updated hashing algorithm or is there a better way? Q2.) Related to Q1 but your example shows how to migrate state from A -> B, but when adding new instances we'd be going from Set_n -> Set_n + Set_m. Would we need to inspect keys on all Set_n to see what needs to move to Set_m?

This is a good question. A naive method is to change the hash function when new task is created. This method however results in a large number of state migration. For instance, if we increase the number of tasks from n to n+m, then for each task approximately (n+m-1)/(n+m) of the keys will be migrated to other tasks, which results in expensive state migration overhead and significant scaling delay. Another method is to maintain a key-to-task mapping in the routing table. This allows us to reassign a few keys to the new tasks when scaling. However, the maintaining overhead could be large and the lookup to the mapping will cause a large number of cache misses when there are many distinct keys. My idea is to use a two-tire routing table. We logically partition the key space of an operator into many, e.g., 512, mini-partitions, called shards. This can be done by a global hash function. And we maintain shard-to-task mapping and monitor the workload on each shard. When a new task is created, we can reassign some shards from existing tasks to the new one based on the workload of the shards. This problem is similar to Multi-way partition problem which divides a set of n integers into a given number k of subsets, minimizing the difference between the smallest and the largest subset sums. The difference is that we also want to minimize the shard reassignments. This problem can be solved by using a heuristic algorithm similar to First-Fit-Decreasing. The benefit of the two-tire routing is that it gives us great flexibility to minimize the shard reassignments while does not bring much additional overhead. The shard reassignments enabled by the two-tire routing not only supports operator scaling, but also can be used for inter-task load balancing when the workload is skewed among the tasks.

Q3.) Re implementation, could this be done with composition instead of inheritance? This would provide more flexibility to the topology author. So instead of extending the operator the bolt could implement StatefulComponent which could cause a configured implementation of a StateStore to be injected by the framework, or something like that.

The ElasticStatefulOperator also exposes some methods to the system for state management, like this:

public abstract class ElasticStatefulOperator implements Serializable {

    private KeyValueState state = new KeyValueState();
    ...

   // to extract a subset of state for migration
   KeyValueState getState(StateFilter filter){...};

   // to merge the new state migrated to the current instance
   void MergeState(KeyValueState newState){...};

}

If this is done with composition instead of inheritance, I am afraid the system cannot get access to the StatefulComponent member in their class.

Also we might want to avoid terminology specific to scaling (i.e., elastic) since this functionality wouldn't be limited to just scaling. It might be used when a faulty node gets restarted for example.

I agree. A better name might to be StatefulOperator, StatefulBolt or StatefulKeyValueOperator. We can choose a proper name after we have reached an agreement on the design.

Thanks.

wangli1426 commented 8 years ago

@billonahill @maosongfu @avflor Any further feedback on this proposal?

maosongfu commented 8 years ago

@wangli1426 We are also working on a stateful processing proposal internally and will discuss it this Friday. Will sync with u after that.

wangli1426 commented 8 years ago

@maosongfu Got it. Thank you!

wangli1426 commented 8 years ago

@billonahill @maosongfu @avflor Any further comment?

maosongfu commented 8 years ago

@wangli1426 The dicussion on Friday was productive and we got a lot of action items. For instance, we would come up with the proposal of stateful interfaces exposed to topology developers, which includes and not limits what you mentioned in this issue. Will public the doc once we finish the first proposal draft.

wangli1426 commented 8 years ago

@maosongfu Thanks for the update. I look forward to the doc.

wangli1426 commented 7 years ago

@maosongfu @billonahill @avflor @kramasamy Any update? I am more than eager to make contribution to new features regarding to operator state management and operator scaling.

billonahill commented 7 years ago

@wangli1426, @maosongfu is working on preparing a doc to share by the end of the week.

wangli1426 commented 7 years ago

@billonahill @maosongfu glad to know that. Look forward to the doc.

wangli1426 commented 7 years ago

@maosongfu May I ask if the doc is ready to share? :)

kramasamy commented 7 years ago

@wangli1426 - @maosongfu has gone on a urgent trip to China. He will be back soon. Meanwhile, I am looking at your source code and proposal.

kramasamy commented 7 years ago

@wangli1426 - I was looking at the source code. Why have you not defined a elastic spout?

wangli1426 commented 7 years ago

@kramasamy First, thanks for your interest to my code. As a spout is typically used to receive input stream from external system / source for the topology, it is usually not computationally intensive. Of cause, we can also implement elastic spout in a similar way, if needed.

wangli1426 commented 7 years ago

@kramasamy any comments?