IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.56k stars 1.76k forks source link

How to change replication-factor of topic #1238

Closed DoubleLuck closed 4 years ago

DoubleLuck commented 5 years ago

Hi all, I try to use sarama to increase replication-factor of topic , but didn't success, I need your help.

Versions

Sarama Version: 97315fefd9d1a91fbc682c52c44fcb490fa5c6e7 Kafka Version: 1.0.0 (scala 2.12) Go Version: 1.11 (windows/amd64)

Configuration

Sarama Config: config.Version = sarama.V1_0_0_0 Kafka Config: 3 brokers on local CentOS, id : 0 , 1, 2 Topic Info: test444 partition 1 replication-factor 1

    Topic: test444  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test444  Partition: 0    Leader: 0   Replicas: 0 Isr: 0

I hava set sarama.Logger

    logger := log.New(os.Stderr, "[sarama] ", log.LstdFlags)
    sarama.Logger = logger
Step1
Step2

Maybe I pass wrong parameters or use incorrect configs, I didn't increase the replication-factor successfully. Please help me , THANK YOU!

varun06 commented 5 years ago

Looks like it should work. Are you still facing this issue, if not, can you please close this issue?

DoubleLuck commented 5 years ago

Looks like it should work. Are you still facing this issue, if not, can you please close this issue?

I'm not working on this recently. I'll try later again, thanks.

sladkoff commented 4 years ago

I'd like to re-open this.

Getting the same error "Unknown topic config name: ReplicationFactor".

Sarama version: 1.26.0 Kafka version: 2.3.1 Go version: 1.13.0

d1egoaz commented 4 years ago

is this even supported by the kafka admin API?

d1egoaz commented 4 years ago

as far I know in order to change the replication factor you'll need to:

  1. create a json file with the new replica assignments
  2. use the kafka-reassign-partitions tools to execute the new plan

the sarama admin example is misleading, there is no support for changing a topic's replication factor on the fly

d1egoaz commented 4 years ago

@DoubleLuck @sladkoff I also have tested the CreatePartitions method.

It's working for me with:

err = admin.CreatePartitions(TestSinglePartitionTopic, 7, nil, false)
    if err != nil {
kafkacat -Lb localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 1001 at kafka:9092 (controller)
 3 topics:
  topic "test.1.nokey" with 7 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 2, leader 1001, replicas: 1001, isrs: 1001
    partition 3, leader 1001, replicas: 1001, isrs: 1001
    partition 4, leader 1001, replicas: 1001, isrs: 1001
    partition 5, leader 1001, replicas: 1001, isrs: 1001
    partition 6, leader 1001, replicas: 1001, isrs: 1001

or using and assignment plan:

number of brokers: 1 broker[0] id: 1001 current number of partitions: 1 current assignment: map[0:[1001]]

target: 4

You'll need to provide an assignment plan for the new partitions (3 new partitions)

    err = admin.CreatePartitions(TestSinglePartitionTopic, 4, [][]int32{{1001}, {1001}, {1001}}, false)
    if err != nil {
        fmt.Printf("   >>> kafkabridge_integration_test.go/: err: %T: %+v\n", err, err)
    }

current assignment after the change: map[0:[1001] 1:[1001] 2:[1001] 3:[1001]]

╰─○ kafkacat -Lb localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 1001 at kafka:9092 (controller)
 3 topics:
  topic "test.1.nokey" with 4 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 2, leader 1001, replicas: 1001, isrs: 1001
    partition 3, leader 1001, replicas: 1001, isrs: 1001

I think this could be in the docs, it's not straightforward to infer this, took me a bit no understand it.

sladkoff commented 4 years ago

@d1egoaz How can we use this to change the replica assignment for existing partitions? In your comment you create new partitions for the new assignment - your initial partition replication assignment is unchanged, isn't it?

Assume I have three broker IDS [1, 2, 3] and I have 3 partitions with replica assignment [1].

I want to update the assignment for these partitions to [1, 2, 3].

If I call CreatePartitions without increasing the partition count:

// assignment: [[1,2,3], [1,2,3], [1,2,3]]
admin.CreatePartitions("test-topic", 3, assignment, false)

I get error Number of partitions is invalid. - Topic already has 3 partitions.

I tried adding a new partition while also changing the replica assignment for all (now 4) partitions. Then I get the following error:

Replica assignment is invalid. - Increasing the number of partitions by 1 but 4 assignments provided.

It seems to me that it's only possible to set the replication assignment for new partitions which isn't sufficient. Can you verify this?

sladkoff commented 4 years ago

I'm assuming that the AlterPartitionReassignments needs to be implemented in order to edit assignments for existing partitions.

@d1egoaz I'd like to try and contribute this if you agree that this is the way to continue.

d1egoaz commented 4 years ago

That'd be awesome! go for it!