tisonkun / kafka-api

Generic abstractions for Kafka API.
https://crates.io/crates/kafka-api
Apache License 2.0
14 stars 2 forks source link

Should only sync if the SyncGroup request comes from the leader member #8

Open aovestdipaperino opened 1 month ago

aovestdipaperino commented 1 month ago

I know that group management is in early stage, but with this change I can easily create multiple consumers belonging to the same group:

    fn is_leader(&self, group_id: &str, member_id: &str) -> bool {
        self.group_coordinator
            .groups
            .get(group_id)
            .map(|group| group.leader_id.as_deref() == Some(member_id))
            .unwrap_or(false)
    }

    fn receive_sync_group(&mut self, request: SyncGroupRequest) -> SyncGroupResponse {
        let assignments = request
            .assignments
            .iter()
            .cloned()
            .map(|assign| (assign.member_id, assign.assignment))
            .collect::<BTreeMap<String, ByteBuffer>>();

        let assignment = if self.is_leader(&request.group_id, &request.member_id) {
            warn!("leader sync group");
            let group = self
                .group_coordinator
                .groups
                .get_mut(&request.group_id)
                .unwrap();
            group.sync(assignments);
            let member = group.members.get(&request.member_id).unwrap();

            member.assignment.clone()
        } else {
            let group = self
                .group_coordinator
                .groups
                .get(&request.group_id)
                .unwrap();
            let leader = group.leader_id.as_deref().unwrap();
            let leader = group.members.get(leader).unwrap();
            leader.assignment.clone()
        };

        SyncGroupResponse {
            protocol_type: request.protocol_type.clone(),
            protocol_name: request.protocol_name.clone(),
            assignment,
            ..Default::default()
        }
    }
aovestdipaperino commented 1 month ago
fn is_leader(&self, group_id: &str, member_id: &str) -> bool {
        self.group_coordinator
            .groups
            .get(group_id)
            .map(|group| group.leader_id.as_deref() == Some(member_id) || group.leader_id.is_none())
            .unwrap_or(false)
    }
tisonkun commented 1 month ago

@aovestdipaperino Thanks for your interest in this simple server. I've sent you an invitation to the to-be-open-sourced impl of kafka-rust. Please check your mailbox and try out that software and leave suggestions if anything less than awesome.