GQBBBB / GQBBBB.github.io

My Blog
3 stars 0 forks source link

状态管理 #1

Open GQBBBB opened 5 years ago

GQBBBB commented 5 years ago

state

用Data Stream API编写的程序通常以各种形式保存状态:

状态如何在内部表示,以及如何以及如何在检查点上持久化,取决于所选的状态后端。

状态后端

Flink捆绑了这些状态后端:

MemoryStateBackend

默认以java堆的对象形式保存数据,Key/value状态和窗口运算中的数据,用hash表来存储值、触发器等信息。基于checkpoints接口的方式,状态后端将对状态进行快照,并作为检查点的一部分,发送通知给JobManager (master),这些数据同样存储在java堆中。

FsStateBackend

可以通过一个文件系统的URL来配置(type, address, path), 例如 :"hdfs://namenode:40010/flink/checkpoints"或者 "file:///data/flink/checkpoints"。FsStateBackend将正在运行的数据保存TaskManager内存中。 在做检查点时, 它将包含状态的快照信息写入配置好的文件系统目录中,. 最小的元数据信息被存储到JobManager的内存中。

RocksDBStateBackend

可以通过一个文件系统的URL来配置(type, address, path), 例如 :"hdfs://namenode:40010/flink/checkpoints"或者 "file:///data/flink/checkpoints"。FsStateBackend将正在运行的数据保存TaskManager内存中,存储到RocksDB(http://rocksdb.org/) 数据库中. 在做检查点时, 所有RocksDB数据库的数据将通过检查点保存到文件系统目录中,. 最小的元数据信息被存储到JobManager的内存中。

Keyed State and Operator State:

键控state:

基于KeyedStream上的状态,与key绑定。 可以将Keyed State看成已经被分区的Operator State,每个key都有一个状态分区。每一个keyed-state都绑定了唯一的一个<parallel-operator-instance, key>。因为每个key都属于一个 keyed operator的并行实例,所以我们可以把它看作是<operator, key>。

算子state:

Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。而一个operator上可能会有多个key,对应多个keyed state。 Kafka Connector就是在Flink中使用Operator State的一个很好的例子,每个Kafka consumer的并行实例保存着一个topic分区和偏移量的映射作为它的Operator state。

Raw and Managed State

键控state和算子state有两种存在形式:原始状态和托管状态。

Using Managed Keyed State

managed keyed state接口提供了对不同类型状态 (当前输入元素的key的) 的访问,这种类型的状态只能运用在KeyedStream (通过stream.keyBy(…)生成)。 几种不同类型的状态:

重要的是要记住,这些状态对象仅用于状态的接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input元素的key。因此,如果所涉及的key不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同。

必须使用StateDescriptor获得状态句柄,它保存了状态的名称(您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     *  ValueState 句柄. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // 访问 state value
        Tuple2<Long, Long> currentSum = sum.value();

        // 更新 count
        currentSum.f0 += 1;

        // 添加input值的第二个字段
        currentSum.f1 += input.f1;

        //更新 state
        sum.update(currentSum);

        //如果 count 达到 2, 输出平均值并清除 state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", //state 名称
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 类型信息
                        Tuple2.of(0L, 0L)); //如果没设置, state 默认值
        sum = getRuntimeContext().getState(descriptor);
    }
}

//可以像在 streaming 程序中使用 (假如我们有 StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// 输出为 (1,4) and (1,5)

该函数将计数和运行总和存储在ValueState中。一旦计数达到2,它将发出平均值并清除状态,以便我们重新开始0。

Using Managed Operator State

有状态函数可以实现更通用的CheckpointedFunction接口去使用Managed Operator State。

CheckpointedFunction

CheckpointedFunction接口通过不同的再分配方案提供对非键控状态的访问。它需要实现两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

每当必须执行检查点时,snapshotState()都会被调用。每次初始化用户定义的函数时或者当函数从早期检查点恢复时,都会调用initializeState()。鉴于此,initializeState()不仅是初始化不同类型的状态,而且还用于状态恢复逻辑。

目前,Managed Operator State支持list-style(列表样式)。状态被认为是可序列化对象的列表,彼此独立,因此有资格在重新分配时进行再分配。根据状态访问方法,定义了以下再分配方案: