apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.89k stars 1.01k forks source link

[SPIP-1] Suggest add HA for StreamPark #3905

Open SbloodyS opened 3 months ago

SbloodyS commented 3 months ago

Search before asking

Motivation

Currently StreamPark only supports standalone deployment, not high-availability deployment, which is not friendly for large volume tasks

Design Detail

StreamPark HA Architecture

zHaDldprtG6uWBI.png

Service Architecture

master-server

  1. Proxy for the frontend UI
  2. API service
  3. Task distribution and fault tolerance

worker-server

  1. Task submission and execution
  2. Task status detection
  3. Receiving and sending alarms when alerting

Code Module

registry-center

  1. The SPI pattern is used to make the registry plug-in, which facilitates the coexistence of multiple registration types
  2. The first registered plug-in to be introduced is to be determined. The options are raft algorithm, third party module like zookeeper, etcd, jdbc, etc...
  3. The registry is responsible for the registration and discovery of services

remote

  1. Responsible for communication and interaction between master and worker. Implementation options are netty, rpc, grp, json, etc...

master-server

  1. Move the current UI and API modules under master
  2. Using the registry for service registration and discovery
  3. The interaction between the manster and worker mostly uses the remote module, except task submission
  4. For task submission, it is suggested to use the production and consumption model to create a table in the data as the task message queue, and the master, as the producer, writes the user's front-end task submission operations into the table. At this time, the master will elect qualified worker as the task executor to write into the table. As a consumer, the worker reads the task to be submitted in the table for actual submission operation, and deletes the table data after the task submission is completed

worker-server

The main purpose of worker is to ensure that the overall code is consistent with the current backend server, so as to reduce unnecessary workload. To acheive this goal, We will make the following changes.

  1. Move the current UI and API modules under master
  2. Ensure that the overall logic does not change from the application state. By modifying how WATCHING_APPS are generated for each task type, task consumers are created at the upper level and WATCHING_APPS are generated for application state consumption.
  3. Using the registry for service registration and discovery
  4. The interaction between the master and worker mostly uses the remote module

Note: Do remember to add the documentation at the end

Compatibility, Deprecation, and Migration Plan

Test Plan

Code of Conduct

1996fanrui commented 3 months ago

Thanks @SbloodyS for driving this proposal!

I have a question: is the master server or the worker server single process? Or are they virtual(logical) role?

I prefer they are virtual(logical) role, it means they can be deployed inside of single process. The single StreamPark Process includes master and worker role together. And worker is multiple thread to fetch and execute tasks periodically.

If master and worker are separate process, users need to start a series of process for HA mode(At least 2 workers and 2 masters). It will increase the deployment cost for users.

Also, I'd like to rename the master server to rest service for 2 reasons:

Looking forward to your feedback, thanks in advance.

SbloodyS commented 3 months ago

Thanks for reply. @1996fanrui

I have a question: is the master server or the worker server single process? Or are they virtual(logical) role?

From my persperctive, master server and worker server are two different process. This makes it easy for users to combine their number freely, such as 2 masters and 3 workers.

If master and worker are separate process, users need to start a series of process for HA mode(At least 2 workers and 2 masters). It will increase the deployment cost for users.

For the deployment issue you mentioned, we can build a start and stop script so that the user can choose how to start. For example, streampark-daemon.sh start/stop master-server/worker-server/all.

Also, I'd like to rename the master server to rest service for 2 reasons:

  • In general, the master only have one active instance.
  • Server means a process, but one process could has multiple services.

Since the master is not only responsible for API responsibilities, but also responsible for task distribution and task worker selection functions. I would prefer to use master server. And we may split the API module into a seprate process maybe call api-server or others if further optimization needed in the future.

WDYT? @1996fanrui

1996fanrui commented 3 months ago

Thanks for the quick feedback!

For the deployment issue you mentioned, we can build a start and stop script so that the user can choose how to start. For example, streampark-daemon.sh start/stop master-server/worker-server/all.

It's a suitable solution. Let's wait for more feedback from the community.

Since the master is not only responsible for API responsibilities, but also responsible for task distribution and task worker selection functions.

As I understand, responsible for task distribution and task worker selection functions. means workers call fetch task rest api periodically, right?

If so, task distribution and task worker selection is a part of API responsibilities.

Please correct me if any understanding is wrong, thanks~

SbloodyS commented 3 months ago

As I understand, responsible for task distribution and task worker selection functions. means workers call fetch task rest api periodically, right?

If so, task distribution and task worker selection is a part of API responsibilities.

Please correct me if any understanding is wrong, thanks~

The application submission context includes API submitted application information and WorkerLoadBalancer select worker's ip/hostname result.

API submitted application information is part of API responsibilities.

And WorkerLoadBalancer is part of Master responsibilities. The WorkerLoadBalancer is an interface in Master. It can be implemented by multiple load balance strategies, such as Round-Robin, Random, Dynamic Weight, etc... We will only choose one of these strategies to implement this time.

Please correct me if I'm wrong. @1996fanrui

HxpSerein commented 3 months ago

Thanks @SbloodyS for the detailed design and the significant effort.

I am looking forward to starting a discussion soon to break down the design into specific tasks. It will be important to understand the dependencies between these tasks so we can establish a clear and effective development plan.

Thanks again for your hard work and dedication.

HxpSerein commented 3 months ago

Further Design Proposal

Motivation

The further design proposal is to implement StreamPark HA in two stages.

The first stage will involve making the least changes to console and focusing on designing and implementing the necessary registry center, resource center and job distribution algorithm to achieve a basic deliverable high-availability version. This stage aims to quickly provide a functional high-availability system with minimal disruption to the current setup.

The second stage will further refactor and split the console into master and worker components to achieve a complete high-availability version. This stage will focus on optimizing performance and scalability, ensuring the system can handle larger workloads and more complex operations.

This proposal currently focuses only on the Basic StreamPark HA Architecture.

Basic StreamPark HA Architecture

image

Registry Center

The registry center is responsible for the registration and discovery of servers. It ensures that servers can work together efficiently and correctly and improves the overall reliability of the system.

Resource Center

The Resource Center provides storage for jar packages and necessary resources. The startup of a job no longer depends on a specific server but can occur on any server. The new server can obtain jar packages from the resource center to achieve high availability and disaster tolerance. This design ensures that jobs can be started and managed from any server, reducing single points of failure.

Job Distribution

A consistent hashing algorithm will be used for job distribution and job migration in case of cluster expansion and disaster. In the basic high-availability version, there is no additional communication between servers. Servers receive tasks by polling the job distribution table. This method ensures even distribution of jobs and efficient handling of server failures or additions.

Note: Remember to add the documentation at the end, including detailed descriptions of the registry center, resource center, job distribution algorithm, and overall architecture.

Compatibility, Deprecation, and Migration Plan

Add upgrade and deployment scripts for users. Ensure these scripts are well-documented and easy to follow, allowing users to transition smoothly to the new high-availability architecture. Include clear instructions for migrating existing jobs and resources.

Test Plan

Add end-to-end (e2e) tests to ensure stability. These tests should cover:

wolfboys commented 3 months ago

Thanks, SbloodyS and HxpSerein, for driving this proposal; I think HxpSerein's solution is more suitable for us. More implementation details and architecture design will need to be discussed later. Let's keep moving forward

Mrart commented 3 months ago

This is a very good work. As we discussed offline, we split it into two stages. In the first stage, @HxpSerein we completed the registration center and file center as proposed , In the second stage, @SbloodyS we split the master-worker

1996fanrui commented 3 months ago

Thanks @HxpSerein for the detailed design, and everyone for the discussion!

I have a few of questions, please take a look in your free time, thanks!

  1. In the current stage, each server includes master and worker, right?
  2. From your design, the registry center has 2 implementations: zookeeper and etcd. I'm curious could we use jdbc related database as the registry center? Such as MySQL, PostgreSQL.
    • Before this improvement, zookeeper or etcd is not necessary for StreamPark. And IIUC, StreamPark needs one DataBase generally.
    • If DataBase could as the registry center, we won't introduce extra services. And it's easy to depoly for users.
    • Of course, we could provide zookeeper or etcd registry center in the future if needed.
  3. Why do we need to introduce the Job Distribution or Consistent Hash?
    • What scenarios need Consistent Hash?
      • As I understand, Consistent Hash is used in database system for load balance.
      • Database needs Consistent Hash, because it must ensure all operations(read or write) of same key are forwarded to the fixed server, unless the old server is crashed.
      • If key1 is written to server1, and we read it from server2. It will be a bug(Cannot find key1 from server2).
    • I don't understand why StreamPark needs Consistent Hash?
      • IIUC, the current design expects each server to be responsible for a part of jobs.
      • It means, if jobA is assigned to server1, all operations of jobA will be executed by server1.
      • I don't know why all operations of jobA must be assigned to same server1.
      • For example, jobA is started by server1. After a while, user clicks stop jobA. As I understand, server2 could stop jobA as well even if server1 works well.
    • If all operations of jobA is executed by same server isn't necessary, I don't think we need the Consistent Hash. It will bring complexity to the system.

Feel free to correct me if my understanding is wrong, thanks :)

SbloodyS commented 3 months ago

Thanks @HxpSerein for the detailed design, and everyone for the discussion!

  1. From your design, the registry center has 2 implementations: zookeeper and etcd. I'm curious could we use jdbc related database as the registry center? Such as MySQL, PostgreSQL.
  2. Why do we need to introduce the Job Distribution or Consistent Hash?

Feel free to correct me if my understanding is wrong, thanks :)

From my perspertive, using a database for the second and third points is a much lighter implementation. Waiting for other's opinion.

HxpSerein commented 3 months ago

Thanks @1996fanrui for the comment.

1.In the current stage, each server includes master and worker, right?

Yes, we will not split the console at the first stage.

2.From your design, the registry center has 2 implementations: zookeeper and etcd. I'm curious could we use jdbc related database as the registry center? Such as MySQL, PostgreSQL.

The specific implementation plan is still under discussion. In my opinion, both solutions have their own advantages and disadvantages. Using a single database can simplify system architecture and management, while using ZooKeeper or etcd has distributed consistency and high availability.

3.Why do we need to introduce the Job Distribution or Consistent Hash?

The server needs to poll jobs to obtain their latest status. When the number of jobs is huge, monitoring can become a significant challenge. Therefore, in my opinion, consistent hashing is primarily used to ensure load balancing in monitoring. In cases of cluster expansion or disaster, the consistent hashing algorithm can help migrate job monitoring.

Please correct me if any understanding is wrong, thanks~

1996fanrui commented 3 months ago

2.From your design, the registry center has 2 implementations: zookeeper and etcd. I'm curious could we use jdbc related database as the registry center? Such as MySQL, PostgreSQL.

The specific implementation plan is still under discussion. In my opinion, both solutions have their own advantages and disadvantages. Using a single database can simplify system architecture and management, while using ZooKeeper or etcd has distributed consistency and high availability.

In general, database has distributed consistency and high availability as well.

As I understand, StreamPark relies heavily on the database. All user information and job information are stored in the database, so the StreamPark cannot work after database is crashed even if the registry center is using zookeeper.

If we introduced the zookeeper, StreamPark will be unavailable whenever either Zookeeper or database crashes. This increases the maintenance cost for users and makes StreamPark more likely to be unavailable.

3.Why do we need to introduce the Job Distribution or Consistent Hash?

The server needs to poll jobs to obtain their latest status. When the number of jobs is huge, monitoring can become a significant challenge. Therefore, in my opinion, consistent hashing is primarily used to ensure load balancing in monitoring. In cases of cluster expansion or disaster, the consistent hashing algorithm can help migrate job monitoring.

As my example mentioned before, all operations of same key should be forwarded to the same server for database system. But I still don't understand why same job should be monitored in the same server?

Or my question is: if job is monitored by one random server, does it works? If yes, there is no need to introduce complex consistent hashing.

I may have missed some background information, sorry for the question.

SbloodyS commented 3 months ago

As my example mentioned before, all operations of same key should be forwarded to the same server for database system. But I >still don't understand why same job should be monitored in the same server?

Or my question is: if job is monitored by one random server, does it works? If yes, there is no need to introduce complex >consistent hashing.

I may have missed some background information, sorry for the question.

From my understanding, we can allow the job to execute on randomly selected node. There is no need to restrict execution to a specific node.

wolfboys commented 3 months ago

3.Why do we need to introduce the Job Distribution or Consistent Hash?

To distribute apps to different servers, consider the scenario where three servers are active and 100 apps need to be managed. Ideally, server A would handle apps 1-33, server B would handle apps 34-66, and server C would handle apps 67-100. However, this is hard to achieve in reality. Consistent hashing is used to solve this problem. We only need to place three points representing server A, server B, and server C on a hash ring and then distribute the 100 apps onto the hash ring. Based on the distances between these apps and the three server nodes, we can determine which server should manage each task. If a server fails, there will be one less node on the hash ring, reducing it to two. The distribution will be recalculated

未命名绘图

Mrart commented 3 months ago

zookeeper and ectc are used as distributed locks in addition to service registration and discovery, and if you use a database, your database will be under a lot of pressure.

As @wolfboys consistent hash always rely zk/etcd

HxpSerein commented 3 months ago

In general, database has distributed consistency and high availability as well.

As I understand, StreamPark relies heavily on the database. All user information and job information are stored in the database, so the StreamPark cannot work after database is crashed even if the registry center is using zookeeper.

If we introduced the zookeeper, StreamPark will be unavailable whenever either Zookeeper or database crashes. This increases the maintenance cost for users and makes StreamPark more likely to be unavailable.

If the database has distributed consistency and high availability, it is indeed simpler to use a database.

As my example mentioned before, all operations of same key should be forwarded to the same server for database system. But I still don't understand why same job should be monitored in the same server?

Or my question is: if job is monitored by one random server, does it works? If yes, there is no need to introduce complex consistent hashing.

First, I agree with @SbloodyS 's viewpoint that only job monitoring needs to consider distribution among servers.

Second, I believe that consistent hashing does not introduce much additional complexity to the architecture. When job monitoring need to be migrated and allocated, we can simply invoke the algorithm to provide the allocation plan. This algorithm could be consistent hashing, greedy, or random. Regardless of the algorithm, our framework only needs to use a common interface for invocation.

Finally, I believe that the primary goal should be to implement the overall distributed framework. The registry center and allocation algorithms are options that can be modified within the framework. Considering the workload and complexity, it is feasible to first implement a registry center using a database. Once the framework is in place, implementing the allocation algorithms will be relatively straightforward, and both greedy and consistent hashing algorithms can be considered.

Please correct me if any understanding is wrong, thanks~

SbloodyS commented 3 months ago

Finally, a consensus was reached. I've broken this task down into the following subtasks.

  1. streampark-registry

    • This part will be completed by me, and I will list the full detailed design in #3934 @SbloodyS
  2. streampark-storage

  3. streampark-console

    • Task production and distribution will be added to this module #3960 @HxpSerein

Anyone who wants to take any part in it is welcome to reply. ^_^

HxpSerein commented 3 months ago

Thanks @SbloodyS for breaking down the tasks!

The streampark-console part will be completed by me.

SbloodyS commented 3 months ago

Thanks @SbloodyS for breaking down the tasks!

The streampark-console part will be completed by me.

Please create a subtask issue first. Thanks.