apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
11.82k stars 3.12k forks source link

[Feature] kafka routine load support rack aware consume #31531

Open flashmouse opened 4 months ago

flashmouse commented 4 months ago

Search before asking

Description

for the sake of better data persistance and system availability , a aws kafka cluster may deploy brokers into mutiple regions, and each replica of the partitions would deploy into different regions. since netflow cross different regions in aws would incurs traffic charge(this expense often much more expensive than ec instance), kafka implement "Rack-aware Partition Assignment for Kafka Consumers" and "Rack aware replica assignment" so that kafka consumer could fetch partition data from the broker that it has the same rack with consumer.

we are now trying to store log data from kafka into doris by routine load. since both kafka and doris are deployed in aws ec instances with 3 regions, we hope to decline the cross-netflow to minimum degree. so we need 2 functions as described below:

  1. BE need add config (ex. called region), then BE should only fetch partitions from kafka brokers from the same region. so kafka-BE has no cross-netflow.
  2. when BE table bucket by random and load_to_single_tablet=true, data fetched from kafka should all store into the tablet that hold by this BE itself,so BE-BE has no cross-netflow.

I have searched issue seems nobody talk this function now. BE configuration seems not have such configuration like "region", and in the source code, when load_to_single_tablet=true, BE choose tablet by round-robin. so for now kafka-BE has cross-netflow, BE fetched data and BE store data may not the same BE so BE-BE has cross-netflow, am I right?

and we want to ensure whether these functions above could implement? thx!

Use case

No response

Related issues

No response

Are you willing to submit PR?

Code of Conduct

sollhui commented 4 months ago

I have searched issue seems nobody talk this function now. BE configuration seems not have such configuration like "region", and in the source code, when load_to_single_tablet=true, BE choose tablet by round-robin. so for now kafka-BE has cross-netflow, BE fetched data and BE store data may not the same BE so BE-BE has cross-netflow, am I right?

Yes, you are right. Currently, Doris does not support choosing a region to consume brokers. Additionally, the BE (referred to as coordinator in Doris) that receives data is not the same as the actual BE stored, so there is a cross netflow here.

BE need add config (ex. called region), then BE should only fetch partitions from kafka brokers from the same region. so kafka-BE has no cross-netflow.

I think this is a very good proposal, as it can help Doris users reduce costs. according to https://github.com/confluentinc/librdkafka/pull/4252, we can support it.

when BE table bucket by random and load_to_single_tablet=true, data fetched from kafka should all store into the tablet that hold by this BE itself,so BE-BE has no cross-netflow.

Under the condition of multiple BE, the BE receiving data is only coordinator, so there may be costs incurred here.

sollhui commented 4 months ago

Hi, @flashmouse would you like to submit this PR ? :)

flashmouse commented 4 months ago

Hi, @flashmouse would you like to submit this PR ? :)

generally kafka rack-aware has 2 parts:

  1. when client send fetch request to broker A, A know this partition has replica hold by broker B(B and client are in the same rack), so A tell client redirect this fetch request to Broker B (Fetch From Follower)
  2. during kafka consumer rebalance, assignor only assign partitions to client when any replica's rack is the same with client(this one is the confluentinc/librdkafka#4252 implement).

for implement method 1, we need add a new parameter ("region") into BE configuration, and rdkafka client set client.rack={region_value}. I think this would be simple relatively. for implement method 2, since doris assign partitions itself, this may be more complex, the function KafkaDataConsumerGroup::assign_topic_partitions need do a refactor(ex. use assign logic from rdkafka_assignor) I guess FE should do some modification to implement this.

I can try implement method 1 first to familiar with pr flow. If you think this is viable I will try do this.

sollhui commented 4 months ago

Hi, @flashmouse would you like to submit this PR ? :)

generally kafka rack-aware has 2 parts:

  1. when client send fetch request to broker A, A know this partition has replica hold by broker B(B and client are in the same rack), so A tell client redirect this fetch request to Broker B (Fetch From Follower)
  2. during kafka consumer rebalance, assignor only assign partitions to client when any replica's rack is the same with client(this one is the confluentinc/librdkafka#4252 implement).

for implement method 1, we need add a new parameter ("region") into BE configuration, and rdkafka client set client.rack={region_value}. I think this would be simple relatively. for implement method 2, since doris assign partitions itself, this may be more complex, ~the function KafkaDataConsumerGroup::assign_topic_partitions need do a refactor(ex. use assign logic from rdkafka_assignor)~ I guess FE should do some modification to implement this.

I can try implement method 1 first to familiar with pr flow. If you think this is viable I will try do this.

A very clear idea can be broken down into two steps, but I have some suggestions for the first point: Adding a configuration to be may not be a good idea. In my opinion, adding a kafka attribute to the routing load job, such as

From KAFKA
(
"Region"="XXX"
);
  1. Job is scheduled by FE, and if configured in BE, FE needs to perceive the need for additional implementation
  2. A global configuration without job flexibility requires consideration of dynamic configuration changes, which can be achieved through schema change

Perhaps you have a better idea, but before that, I recommend you to read https://doris.apache.org/docs/data-operate/import/import-way/routine-load-manual

flashmouse commented 4 months ago

In Kafka, when rack parameters of broker(broker.rack) and consumer (client.rack) are equal, then they're considered "in the same region". when use routine load, BE is the consumer so it must have a way to specify the parameter client.rack, so I think add new configuration into BE is inevitable, it's the precondition to implement rack-aware consume. routine load can be treated as a kafka consumer-group, consumer-group hold whole consumers from different instances and regions, I think add configuration to it make no sense.

sollhui commented 4 months ago

so I think add new configuration into BE is inevitable

In fact, the consumption task of routinue load is scheduled by FE, and the consumption task corresponding to a job may be on different BE, the consumption of tasks is also determined by FE, not BE. So I think it is better which region should routine load task consume when job was created, it can be a job property or FE config.

flashmouse commented 4 months ago

so I think add new configuration into BE is inevitable

In fact, the consumption task of routinue load is scheduled by FE, and the consumption task corresponding to a job may be on different BE, the consumption of tasks is also determined by FE, not BE. So I think it is better which region should routine load task consume when job was created, it can be a job property or FE config.

FE assign topic-partitions to BEs, but BE is the one send fetch request to kafka, even FE want to assign partition to BE without cross network flow, it still need know BE's region, so a parameter specify the region of BE is requisite. maybe I still couldn't understand what you mean, could you provide a more detail description?

sollhui commented 4 months ago

so I think add new configuration into BE is inevitable

In fact, the consumption task of routinue load is scheduled by FE, and the consumption task corresponding to a job may be on different BE, the consumption of tasks is also determined by FE, not BE. So I think it is better which region should routine load task consume when job was created, it can be a job property or FE config.

FE assign topic-partitions to BEs, but BE is the one send fetch request to kafka, even FE want to assign partition to BE without cross network flow, it still need know BE's region, so a parameter specify the region of BE is requisite. maybe I still couldn't understand what you mean, could you provide a more detail description?

It needs to know the region of BE, different BE may be in different regions, and then determine which partition of Kafka this BE should consume based on this region. Do you mean this?

flashmouse commented 4 months ago

so I think add new configuration into BE is inevitable

In fact, the consumption task of routinue load is scheduled by FE, and the consumption task corresponding to a job may be on different BE, the consumption of tasks is also determined by FE, not BE. So I think it is better which region should routine load task consume when job was created, it can be a job property or FE config.

FE assign topic-partitions to BEs, but BE is the one send fetch request to kafka, even FE want to assign partition to BE without cross network flow, it still need know BE's region, so a parameter specify the region of BE is requisite. maybe I still couldn't understand what you mean, could you provide a more detail description?

It needs to know the region of BE, different BE may be in different regions, and then determine which partition of Kafka this BE should consume based on this region. Do you mean this?

yes that's what I mean

sollhui commented 4 months ago

We have no disagreement on this point. My previous question was that the job may send consumption tasks from different partitions to different BE, and it is possible that the region of this BE and the region of the partition are different. Is there a possibility of this issue?Therefore, my previous suggestion was to configure the region that the job needs to consume.

flashmouse commented 4 months ago
say we have 3 brokers(ba,bb,bc) in region ra,rb,rc respectively, each one hold one partition's leader and another partition's follower, and we have 3 BE (BEa, BEb, BEc) in region ra,rb,rc respectively, so whole deployment like below: region kafka broker partition-replica BE
ra ba p1-leader, p2-follower BEa
rb bb p1-follower, p3-leader BEb
rc bc p2-leader, p3-follower BEc

consider different assign result below:

  1. FE keep assign logic with no change, so assign result is: BEa-p1, BEb-p2, BEc-p3. since p1-leader & BEa are all in region ra, and according to kafka fetch-from-follower function, BEc could fetch p3 from p3-follower, so consume p1 & p3 have no cross network. in such scenario, BEb consume p2 has cross network.
  2. FE do modification to aware of BEs' region and brokers' region, only give BE partition that any replication's region is the same with BE, so assign result may like: BEa-p1, BEb-p3, BEc-p2, in such scenario, no cross network generate.

I'm still not know the reason to add "region" parameter to routine load, if you mean one routine load set region only consume partitions have replica in this region, then we must create routine load for each region, that's hard to maintain. what's more, kafka topic partition assignment in kafka cluster is not immutable, it may change at any time, one partition may hold in regionA, and next minute move to regionB, make it more hard to do this.

sollhui commented 4 months ago

FE do modification to aware of BEs' region and brokers' region, only give BE partition that any replication's region is the same with BE, so assign result may like: BEa-p1, BEb-p3, BEc-p2, in such scenario, no cross network generate.

That why I think job should know region of Kafka partitions.

flashmouse commented 4 months ago

FE do modification to aware of BEs' region and brokers' region, only give BE partition that any replication's region is the same with BE, so assign result may like: BEa-p1, BEb-p3, BEc-p2, in such scenario, no cross network generate.

That why I think job should know region of Kafka partitions.

ok, I know...