xingbofeng / xingbofeng.github.io

counterxing的博客
https://xingbofeng.github.io
175 stars 18 forks source link

批处理和流处理 #59

Open xingbofeng opened 8 months ago

xingbofeng commented 8 months ago

批处理模式

批处理是一种离线的数据处理方式,它将一批数据(通常是一组数据记录)作为输入,并在特定的时间或条件下进行处理。批处理通常用于对日志等数据进行分析、转换和汇总,以生成报告、进行数据挖掘、进行机器学习等。

在批处理中,数据被分为离散的块,批处理作业会按照一定的顺序和步骤对这些数据进行处理。这批处理作业可以定期运行,也可以在特定的触发条件下执行。批处理通常要求数据存储在一个集中的位置,以便进行批量处理。

传统的数据融合通常基于批模式。在批的模式下,我们会通过一些周期性运行的JOB,将数据从关系型数据库、文件存储向下游的目标数据库进行同步,中间可能有各种类型的转换。批处理通常用于处理大规模离线任务。“大规模”体现在:每次处理输入的数据量大;每次处理运行的时间长(可能几分钟~几天)。

MapReduce模式

常见的批处理模式是MapReduce模式,其主要思想是自动将一个大的计算拆解成 Map 和 Reduce ,MapReduce模式并行的读写分区,然后执行作业,再聚合。提供了两个纯函数,Map、Reduce。因为纯函数,保证了无副作用,所有流程和架构清晰,并且容易重试。如图所示:

MapReduce 可以在多台机器上并行执行计算,而无需编写代码来显示处理并行问题。Mapper 和 Reducer 一次只能处理一条记录;它们不需要知道它们的输入来自哪里,或者输出去往什么地方,所以框架可以处理在机器之间移动数据的复杂性:

Spark框架

Map 和 Reduce 中间夹杂着一步数据移动,也就是 shuffle。

Shuffle是MapReduce中的一个重要步骤,它是指将 Map 阶段输出的数据按照某种规则重新分配到 Reduce 阶段的各个节点上,以便 Reduce 节点能够对相同的key进行聚合操作。

具体来说,Shuffle过程包括三个步骤:

Shuffle过程是MapReduce中非常耗费时间和网络带宽的一个步骤,所以优化Shuffle过程对于提高MapReduce的性能非常重要。常见的Shuffle优化方法包括增加Map端的本地聚合、增加Reduce端的并行度、使用压缩和序列化等。由于 MapReduce 的框架限制,一个 MapReduce 任务只能包含一次 Map 和一次 Reduce,计算完成之后,MapReduce 会将运算结果写回到磁盘中(更准确地说是分布式存储系统)供下次计算使用。如果所做的运算涉及大量循环,那么整个计算过程会不断重复地往磁盘里读写中间结果。这样的读写数据会引起大量的网络传输以及磁盘读写,极其耗时,而且它们都是没什么实际价值的废操作。因为上一次循环的结果会立马被下一次使用,完全没必要将其写入磁盘。

Spark的内存计算模型还支持数据的高速缓存和重复使用,这也有助于提高计算效率和速度。同时,Spark还采用了一种基于DAG的执行引擎,可以在数据处理过程中自动优化计算流程,提高计算效率和速度。Spark 延续了MapReduce 的设计思路:对数据的计算也分为 Map 和Reduce 两类。但不同的是,一个Spark 任务并不止包含一个 Map 和一个Reduce,而是由一系列的Map、Reduce构成。这样,计算的中间结果可以高效地转给下一个计算步骤,提高算法性能。虽然 Spark 的改进看似很小,但实验结果显示,它的算法性能相比MapReduce 提高了很多。

流处理模式

流处理是一种实时的数据处理方式,它将数据流作为输入,并在数据流中不断地进行处理和分析。流处理通常用于对实时数据进行分析、监控和决策,以便快速响应业务需求。

在流处理中,数据是连续不断地产生和处理的,而不是像批处理那样一次性处理一批数据。流处理系统通常需要实时处理数据,因此需要快速响应和高效处理数据。流处理通常要求数据存储在分布式系统中,以便进行实时处理和分析。流处理系统可以处理无限量的数据。显然,同批处理一样,在流处理过程中,也都需要维持中间状态。

流处理系统通常采用事件驱动的方式进行处理,即当新的事件到达时,系统会立即对其进行处理并产生相应的输出。流处理系统通常支持窗口化处理,即将数据流分割为固定大小的窗口,并对每个窗口内的数据进行处理和分析。

与批处理相比,流处理具有更低的延迟和更高的实时性,能够更快地响应业务需求。

流分析一般需要在一个时间窗口内做聚合分析,例如一段时间内的计算,平均值。一般窗口类型可分为以下三类。

举个例子:

Flink

Flink是Apache软件基金会的一个顶级项目,是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,并且可以同时支持实时计算和批量计算。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个(sink)结束。

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。

Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据中。

Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

Flink 算子之间可以通过一对一(直传)模式或重新分发模式传输数据,如图所示:

一对一模式(例如上图中的 Source 和 map() 算子之间)可以保留元素的分区和顺序信息。这意味着 map() 算子的 subtask 输入的数据以及其顺序与 Source 算子的 subtask 输出的数据和顺序完全相同,即同一分区的数据只会进入到下游算子的同一分区。

重新分发模式(例如上图中的 map() 和 keyBy/window 之间,以及 keyBy/window 和 Sink 之间)则会更改数据所在的流分区。当你在程序中选择使用不同的 transformation,每个算子子任务也会根据不同的 transformation 将数据发送到不同的目标子任务。例如以下这几种 transformation 和其对应分发数据的模式:keyBy()(通过散列键重新分区)、broadcast()(广播)或 rebalance()(随机重新分发)。在重新分发数据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,keyBy/window 的 subtask 接收到的 map() 的 subtask 中的元素都是有序的)。因此,上图所示的 keyBy/window 和 Sink 算子之间数据的重新分发时,不同键(key)的聚合结果到达 Sink 的顺序是不确定的。

Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。

Flink的可靠性保证

Flink通过Checkpoint机制来保证消息的可靠性和一致性。Checkpoint机制是Flink中的一种容错机制,用于在任务执行过程中周期性地保存任务状态,并在任务发生故障或异常时恢复任务状态。通过Checkpoint机制,Flink可以保证消息的可靠性和一致性,从而避免数据丢失和处理不准确的问题。

状态(State):状态是对应的 操作者 严格在 Checkpoint 之前的所有事件,并且不包含在此 Checkpoint 后的任何事件后而生成的状态。

Flink通过Checkpoint机制实现消息的可靠性和一致性的步骤如下:

需要注意的是,Checkpoint机制需要消耗一定的计算和存储资源,因此需要根据具体场景合理设置Checkpoint的触发间隔和保存位置。同时,Checkpoint机制也需要考虑到任务的性能和可靠性,以确保任务的高效执行和数据处理的准确性。

Flink的 Checkpoint机制确保了恰好一次(Exactly once):事件既不会丢失也不会被重复传递。但这个Exactly once 只能保证Flink内部自身,对于Flink和外界的数据传输的可靠性,如Kafka、HDFS等,要保证其数据可靠性,需要另行设置:

最多一次(NONE):事件可能会丢失但不会被重复传递 至少一次(AT_LEAST_ONCE):事件不会丢失但可能会被重复传递 恰好一次(EXACTLY_ONCE):事件既不会丢失也不会被重复传递

示例

以下是一个从源Kafka读取数据,并经过一次Map处理后写入目标Kafka的示例程序。程序的主要逻辑如下:

  1. 创建StreamExecutionEnvironment对象,设置Checkpoint的间隔时间和模式,并设置Checkpoint的存储位置和删除策略。

  2. 使用KafkaSource读取Kafka数据,并通过map操作对数据进行处理。

  3. 使用KafkaSink将处理后的数据写出到Kafka中。在写入Kafka时,设置了精确一次(exactly-once)的数据可靠性保证,通过开启2PC(two-phase-commit)来实现数据的精确一次写入。具体来说,设置了以下参数:

需要注意的是,为了实现精确一次的数据可靠性保证,需要同时满足以下条件:

package app;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class THDataTransFlink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置 Checkpoint 点,而且保证恰好一次,保证数据可靠性
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 设置 Checkpoint storage 的位置
        checkpointConfig.setCheckpointStorage("/usr/local/flink_check_point/");
        // 在任务取消时保留已经完成的Checkpoint,不进行删除
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 读取kafka数据
        KafkaSource<Req> source = KafkaSource
                .<Req>builder()
                .setBootstrapServers("source.example.com:9092")
                .setProperty("enable.auto.commit", "true")
                .setProperty("auto.commit.interval.ms", "1000")
                .setTopics("topic")
                .setGroupId("kafka_group_id")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new ReqDeserializationSchema())
                .build();

        // 一次map
        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "topic")
                .map(new KafkaProcess());

        // 写出到Kafka
        // 精准一次写入Kafka,必须满足以下条件:
        // 1、开启Checkpoint
        // 2、sink 保证级别是 精准一次
        // 3、sink 设置事务前缀
        // 4、sink 设置事务超时时间:Checkpoint 间隔 < 事务超时时间
        KafkaRecordSerializationSchema<String> recordSerializer = KafkaRecordSerializationSchema
                .builder()
                .setTopic("topic_sink")
                .setValueSerializationSchema(new SimpleStringSchema()).build();
        Properties producerConfig = new Properties();
        // 设置kafka事务超时时间
        producerConfig.setProperty("transaction.max.timeout.ms", Integer.toString(10 * 60 * 1000));
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setKafkaProducerConfig(producerConfig)
                .setBootstrapServers("target.example.com:9092")
                .setRecordSerializer(recordSerializer)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 设置数据可靠性保证恰好一次,开启2PC
                .setTransactionalIdPrefix("th-data-trans") // 设置事务前置
                .build();

        stream.sinkTo(sink);

        env.execute("task");
    }
}

总结

本文介绍了批处理和流处理模式的概念,通过 批处理的 Map Reduce 模式引入,到 Spark 的简单介绍,Spark由于使用分布式内存系统维护 Map Reduce 的中间状态,提升批处理的性能。之后通过流处理的概念引入,并简单介绍了Flink的概念及其如何保证其数据可靠性。最后通过一个Kafka2Kafka的demo演示了如何通过流的方式来处理数据。

参考: Apache Flink 《数据密集型应用系统设计》 实时流计算框架Flink 批处理计算与流处理计算的区别是什么?