AutoMQ / automq

AutoMQ is a cloud-first alternative to Kafka by decoupling durability to S3 and EBS. 10x cost-effective. Autoscale in seconds. Single-digit ms latency.
https://www.automq.com/docs
Other
3.51k stars 172 forks source link

[consult] Abnormal Downtime #966

Closed funky-eyes closed 5 months ago

funky-eyes commented 5 months ago

Version & Environment

https://docs.automq.com/zh/docs/automq-s3kafka/KRMqwQBysionzukazS6cnP2Hnmh 针对异常宕机,通过 Multi-Attach 机制去触发将宕机节点的Delta WAL进行上传,然后进行分区重分配. 开源版似乎没有看到相关的实现对接,请问automq 开源版是否有统一的标准接口,可以让社区的用户自行适配异常宕机的delta wal的上传

For abnormal downtime, the Multi-Attach mechanism is used to trigger the Delta WAL of the down node to be uploaded, and then partition reallocation is performed. The open-source version does not seem to see the relevant implementation docking. May I ask if the automq open-source version has a unified standard interface that allows users in the community to adapt to the upload of delta wal that is abnormally down?

What went wrong?

What should have happened instead?

How to reproduce the issue?

1. 2. 3.

Additional information

Please attach any relevant logs, backtraces, or metric charts.

superhx commented 5 months ago

Currently, https://github.com/AutoMQ/automq/blob/main/s3stream/src/main/java/com/automq/stream/api/Client.java provides a #failover method for failover on specified path disks

Plan: wrap this method into a command-line tool

funky-eyes commented 5 months ago

Currently, https://github.com/AutoMQ/automq/blob/main/s3stream/src/main/java/com/automq/stream/api/Client.java provides a #failover method for failover on specified path disks

Plan: wrap this method into a command-line tool

这似乎只能在故障的机器上去执行failover,我没看到它被controller节点所调用,我认为当某个节点宕机后,应该由controller节点去统一托管这个failover的行为,其中的行为就应该包含对云盘的Multi-Attach 机制进行提交对应的Delta WAL后再进行分区重分配,以保证高可用,但是似乎目前failover的行为并不是这样的流程。

It seems that the failover can only be performed on the failed machine, I don't see it being called by the controller node, I think when a node is down, the controller node should be the one to host the failover behavior, which should include submitting the corresponding Delta WALs to the cloud disk's Multi-Attach mechanism and then performing partition reallocation to ensure high availability. Delta WAL and then partition reallocation to ensure high availability, but it seems that the current failover behavior is not such a process.

superhx commented 5 months ago

Currently, https://github.com/AutoMQ/automq/blob/main/s3stream/src/main/java/com/automq/stream/api/Client.java provides a #failover method for failover on specified path disks Plan: wrap this method into a command-line tool

这似乎只能在故障的机器上去执行failover,我没看到它被controller节点所调用,我认为当某个节点宕机后,应该由controller节点去统一托管这个failover的行为,其中的行为就应该包含对云盘的Multi-Attach 机制进行提交对应的Delta WAL后再进行分区重分配,以保证高可用,但是似乎目前failover的行为并不是这样的流程。

It seems that the failover can only be performed on the failed machine, I don't see it being called by the controller node, I think when a node is down, the controller node should be the one to host the failover behavior, which should include submitting the corresponding Delta WALs to the cloud disk's Multi-Attach mechanism and then performing partition reallocation to ensure high availability. Delta WAL and then partition reallocation to ensure high availability, but it seems that the current failover behavior is not such a process.

It can be executed on any Broker. Considering the different necessities of businesses, the timing and infrastructure for Broker Failover vary

Therefore, currently, only the most basic Failover capability is provided.

funky-eyes commented 5 months ago

That means this needs to be executed on the broker's node, but manually, not in a way similar to some of the standard api's provided by apachekafka, where you just fill in the classname and it is automatically loaded? For example, kafka-connect. so if the partition reallocation has been triggered automatically, and that data has not been uploaded to s3, is it useless for me to go and execute that failover after the fact?

也就是说这个需要在broker的节点上执行,但是是人工操作,而不是类似apachekafka提供的一些标准api,只需要将classname填写后,自动加载的方式?比如kafka-connect。那如果已经自动触发了分区重分配,该数据并没有被上传到s3,我事后去执行该failover是否属于无用功?

superhx commented 5 months ago

That means this needs to be executed on the broker's node, but manually, not in a way similar to some of the standard api's provided by apachekafka, where you just fill in the classname and it is automatically loaded? For example, kafka-connect. so if the partition reallocation has been triggered automatically, and that data has not been uploaded to s3, is it useless for me to go and execute that failover after the fact?

也就是说这个需要在broker的节点上执行,但是是人工操作,而不是类似apachekafka提供的一些标准api,只需要将classname填写后,自动加载的方式?比如kafka-connect。那如果已经自动触发了分区重分配,该数据并没有被上传到s3,我事后去执行该failover是否属于无用功?

funky-eyes commented 5 months ago

That means this needs to be executed on the broker's node, but manually, not in a way similar to some of the standard api's provided by apachekafka, where you just fill in the classname and it is automatically loaded? For example, kafka-connect. so if the partition reallocation has been triggered automatically, and that data has not been uploaded to s3, is it useless for me to go and execute that failover after the fact? 也就是说这个需要在broker的节点上执行,但是是人工操作,而不是类似apachekafka提供的一些标准api,只需要将classname填写后,自动加载的方式?比如kafka-connect。那如果已经自动触发了分区重分配,该数据并没有被上传到s3,我事后去执行该failover是否属于无用功?

  • The option provided by Kafka through the SPI is an excellent solution. Should we think about creating an Issue to discuss the relevant details more thoroughly?
  • The sequence where the partition reallocation happens before uploading data to S3 will not cause issues. This is because the stream linked to a partition is only closed gracefully after a failover. Consequently, the partition can then be opened as usual.
  1. 第一点我认为是必要的,应该类似spi这样的加载机制,可以让社区或者其他公司或人员可以对接实现,由controller去触发,传入一些宕机的metadata信息之类的。
  2. 第二点我认为当我在实时消费的时候,分区已经进行重分配了,也有新的消息投递进来了,这个时候宕机节点的delta wal中的数据即便提交了,能对业务没有影响吗?这段消息已经被跳过了吧?

the first point I think is necessary, should be similar to spi such as loading mechanism, can let the community or other companies or people can butt to realize, by controller to trigger, pass some downtime metadata information and so on.

the second point I think when I am consuming in real time, the partition has been reallocated, there are also new messages dropped in, this time the data in the delta wal of the down node even if it is submitted, can it have no impact on the business? This message has already been skipped right?

superhx commented 5 months ago

That means this needs to be executed on the broker's node, but manually, not in a way similar to some of the standard api's provided by apachekafka, where you just fill in the classname and it is automatically loaded? For example, kafka-connect. so if the partition reallocation has been triggered automatically, and that data has not been uploaded to s3, is it useless for me to go and execute that failover after the fact? 也就是说这个需要在broker的节点上执行,但是是人工操作,而不是类似apachekafka提供的一些标准api,只需要将classname填写后,自动加载的方式?比如kafka-connect。那如果已经自动触发了分区重分配,该数据并没有被上传到s3,我事后去执行该failover是否属于无用功?

  • The option provided by Kafka through the SPI is an excellent solution. Should we think about creating an Issue to discuss the relevant details more thoroughly?
  • The sequence where the partition reallocation happens before uploading data to S3 will not cause issues. This is because the stream linked to a partition is only closed gracefully after a failover. Consequently, the partition can then be opened as usual.
  1. 第一点我认为是必要的,应该类似spi这样的加载机制,可以让社区或者其他公司或人员可以对接实现,由controller去触发,传入一些宕机的metadata信息之类的。
  2. 第二点我认为当我在实时消费的时候,分区已经进行重分配了,也有新的消息投递进来了,这个时候宕机节点的delta wal中的数据即便提交了,能对业务没有影响吗?这段消息已经被跳过了吧?

the first point I think is necessary, should be similar to spi such as loading mechanism, can let the community or other companies or people can butt to realize, by controller to trigger, pass some downtime metadata information and so on.

the second point I think when I am consuming in real time, the partition has been reallocated, there are also new messages dropped in, this time the data in the delta wal of the down node even if it is submitted, can it have no impact on the business? This message has already been skipped right?

  1. Broker1 crashes;
  2. Broker2 attempts to open the partition but is blocked while opening the stream because the stream status is OPEN;
  3. To facilitate failover for Broker1, some brokers update Broker1's Write-Ahead Log (WAL) data to S3 and close the streams managed by Broker1;
  4. Broker2 successfully opens the partition because the stream status is CLOSED;
funky-eyes commented 5 months ago

image @superhx 我发现kill -9 pid后,通过./kafka-topics.sh --topic automq-test --bootstrap-server xxxx:xxx --describe 可以观察到leader很快就在几秒钟后重分配了,但是通过压测的进程依旧输出 Got error produce response with correlation id 178349 on topic-partition automq-test-0, retrying (95 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender) 直到将宕机的节点拉起才可用,但是在拉起宕机节点前,其实分区已经被重分配了,leader也已经变了,但是还是写不进去消息,具体可以看我发的截图,其中右上角的机器进行查看分区的详情,左边的机器是被kill-9的节点,中下方的机器是在进行压测的机器,是不是意味着开源版上,非优雅下线后,将直接不可用?

I found that after killing -9 pid, through ./kafka-topics.sh --topic automq-test --bootstrap-server xxxx: xxx --describe I can observe that the leader is quickly reassigned after a few seconds, but the process through torture testing still outputs Got error produce response with correlation id 178349 on topic-partition automq-test-0, retrying (95 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals. Sender) It is not available until the crashed node is pulled up, but before pulling up the crashed node, the partition has actually been reassigned, and the leader has changed, but the message still cannot be written. For details, you can see the screenshot I sent. The machine in the upper right corner views the details of the partition. The machine on the left is the node that was killed-9. The machine at the bottom of the middle is the machine that is undergoing torture testing. Does it mean that on the open-source version, after non-elegant offline, it will be directly unavailable?

daniel-y commented 5 months ago

@funky-eyes 当 Broker 被 Kill 退出时,分区会重新调度到其他节点,在你的测试场景中,Broker 是非优雅退出,所以会导致分区并没有被正确关闭,所以及时分区被分配了新的 Leader,但因为无法打开所以没办法提供服务。我们还是期望进程非优雅退出后,能快速将进程在恢复出来,比如通过 Systemd 自动拉起。当然如果要走下线节点,可以走优雅下线节点。因为云服务 ECS 具备较高的可用性,以及能够容忍物理机级别的故障,所以 ECS 及时在宕机后,也能快速恢复。

另外,如前面讨论的,极端场景下,更多的 Failover 手段似乎是必要的,AutoMQ 社区版当前仅提供了函数级的 API,扩展有一些门槛,后续我们会考虑在 API 稳定后提供 SPI 的机制,这块在我们的规划当中。

When a Broker forcefully exits, its partitions are redistributed but may not close properly. This can prevent a newly assigned Leader from serving requests, despite the partition's reallocation. To address abrupt shutdowns, we aim for rapid system recovery, possibly through automatic restarts with Systemd. Decommissioning should be graceful to ensure smooth transitions. Cloud services like ECS, designed for high availability and resilience to physical failures, facilitate quick recovery after outages.

Additionally, given the potential need for more robust failover mechanisms in extreme cases, the current function-level API in AutoMQ's community edition presents extension challenges. We're exploring the addition of an SPI mechanism post-API stabilization, aligning with our strategic planning.