apache / incubator-hugegraph

A graph database that supports more than 100+ billion data, high performance and scalability (Include OLTP Engine & REST-API & Backends)
https://hugegraph.apache.org
Apache License 2.0
2.58k stars 512 forks source link

[Question] call rest-api in multi-users env (concurrency) #2490

Closed SkrDrag closed 2 months ago

SkrDrag commented 3 months ago

Feature Description (功能描述)

多个用户同时通过Restful API访问server时,例如GET http://localhost:8080/graphs/{graph}/traversers/kout?source="1:marko"&max_depth=2,server端是同时创建多个线程和实例处理请求的吗。 能不能有一个更上层的接口,同时接收多个用户的API请求,管理这些请求,创建任务实例进行处理。这样资源管理能更好。

dosubot[bot] commented 3 months ago

嘿,@SkrDrag,又见面了!希望这次我们能一起找到更好的解决方案。🚀

在多用户并发访问Restful API时,HugeGraph的服务器确实会创建多个线程和实例来处理请求。这是通过TaskManager类来管理的,它使用ExecutorService来异步执行任务,并使用PausableScheduledThreadPool来调度任务。ExecutorService实例taskExecutortaskDbExecutorserverInfoDbExecutor用于并发执行不同类型的任务。特别是taskExecutor用于执行任务,并使用固定的线程池大小进行初始化,该大小由THREADS常量确定。这允许并发执行任务,同时处理多个请求。

此外,TaskManager类提供了一个更高级别的接口,用于管理这些请求并创建任务实例以实现更好的资源管理。它包括添加调度器(addScheduler)、关闭调度器(closeScheduler)以及管理任务的执行和调度(scheduleOrExecuteJobscheduleOrExecuteJobForGraph)的方法。该类还处理角色选举(enableRoleElectiononAsRoleMasteronAsRoleWorker),可以根据其在集群中的角色动态调整服务器的行为。

总的来说,HugeGraph服务器的代码库中的TaskManager类被设计为通过创建多个线程和实例来处理通过Restful API并发访问的多个用户。它还提供了一种结构化的方式来有效地管理这些任务和资源。

关于TaskManager类如何处理多个并发请求的问题,它使用线程池来处理多个并发请求,而不是为每个请求创建一个新线程。这是通过使用初始化有固定线程池的ExecutorService实例来实现的。具体来说,taskExecutortaskDbExecutorserverInfoDbExecutor使用ExecutorUtil.newFixedThreadPool方法初始化固定线程池,表明使用了线程池机制。这些池的大小由初始化时传递的参数确定。例如,taskExecutor使用等于pool变量的池大小进行初始化,该变量设置为THREADS(默认为4个线程)。

此外,名为schedulerExecutorPausableScheduledThreadPool用于调度任务,这也是一种线程池,用一个线程进行初始化:

this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(1, TASK_SCHEDULER);

这些线程池用于执行和管理任务,而无需为每个传入的请求创建新的线程,从而利用固定的线程集来处理多个任务。

关于TaskManager类如何管理线程池中线程的生命周期,以及任务完成或线程遇到错误时会发生什么的问题,TaskManager类主要通过使用Java的ExecutorService和自定义的调度和执行任务的逻辑来管理线程的生命周期。当任务完成时,它由线程池的任务执行机制隐式处理。ExecutorService负责运行提交给它的任务,一旦任务完成,线程就变得可用,用于执行其他任务。在TaskManager类本身中,没有对任务完成后的操作进行明确的处理。

如果线程在执行任务时遇到错误,TaskManager类并没有明确详述任务执行阶段的异常处理。然而,该类在closeTaskTxcloseSchedulerTx等方法中包含了异常处理,它捕获异常,记录日志,并抛出HugeException。这表明了一种捕获异常、记录日志并将其重新抛出为自定义异常类型的模式,这种模式可以类似地应用于任务执行中的错误处理。

shutdown方法负责优雅地关闭TaskManager管理的所有线程池。它试图关闭执行器,并等待当前执行的任务在指定的超时期限内完成。如果任务在超时期限内未完成,它会记录一个错误。

scheduleOrExecuteJob方法定期被调用以调度或执行作业。它遍历所有的TaskScheduler实例,在每个实例上进行同步,以线程安全的方式管理任务的调度和执行。这个方法,以及scheduleOrExecuteJobForGraph,封装了决定是否调度新任务、执行排队的任务或取消任务的逻辑,这取决于服务器的角色(主节点或工作节点)和其他条件。

总的来说,TaskManager类通过使用ExecutorService池来管理线程,其中特定的池专用于任务执行、数据库操作和任务调度。任务完成和错误处理由线程池机制隐式管理,并在某些方法中通过自定义逻辑明确管理。任务的生命周期,包括调度、执行和取消,是通过定期执行和在调度器对象上同步来管理的。

参考源码:

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
SkrDrag commented 3 months ago

嘿,@SkrDrag,又见面了!希望这次我们能一起找到更好的解决方案。🚀

在多用户并发访问Restful API时,HugeGraph的服务器确实会创建多个线程和实例来处理请求。这是通过TaskManager类来管理的,它使用ExecutorService来异步执行任务,并使用PausableScheduledThreadPool来调度任务。ExecutorService实例taskExecutortaskDbExecutorserverInfoDbExecutor用于并发执行不同类型的任务。特别是taskExecutor用于执行任务,并使用固定的线程池大小进行初始化,该大小由THREADS常量确定。这允许并发执行任务,同时处理多个请求。

此外,TaskManager类提供了一个更高级别的接口,用于管理这些请求并创建任务实例以实现更好的资源管理。它包括添加调度器(addScheduler)、关闭调度器(closeScheduler)以及管理任务的执行和调度(scheduleOrExecuteJobscheduleOrExecuteJobForGraph)的方法。该类还处理角色选举(enableRoleElectiononAsRoleMasteronAsRoleWorker),可以根据其在集群中的角色动态调整服务器的行为。

总的来说,HugeGraph服务器的代码库中的TaskManager类被设计为通过创建多个线程和实例来处理通过Restful API并发访问的多个用户。它还提供了一种结构化的方式来有效地管理这些任务和资源。

关于TaskManager类如何处理多个并发请求的问题,它使用线程池来处理多个并发请求,而不是为每个请求创建一个新线程。这是通过使用初始化有固定线程池的ExecutorService实例来实现的。具体来说,taskExecutortaskDbExecutorserverInfoDbExecutor使用ExecutorUtil.newFixedThreadPool方法初始化固定线程池,表明使用了线程池机制。这些池的大小由初始化时传递的参数确定。例如,taskExecutor使用等于pool变量的池大小进行初始化,该变量设置为THREADS(默认为4个线程)。

此外,名为schedulerExecutorPausableScheduledThreadPool用于调度任务,这也是一种线程池,用一个线程进行初始化:

this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(1, TASK_SCHEDULER);

这些线程池用于执行和管理任务,而无需为每个传入的请求创建新的线程,从而利用固定的线程集来处理多个任务。

关于TaskManager类如何管理线程池中线程的生命周期,以及任务完成或线程遇到错误时会发生什么的问题,TaskManager类主要通过使用Java的ExecutorService和自定义的调度和执行任务的逻辑来管理线程的生命周期。当任务完成时,它由线程池的任务执行机制隐式处理。ExecutorService负责运行提交给它的任务,一旦任务完成,线程就变得可用,用于执行其他任务。在TaskManager类本身中,没有对任务完成后的操作进行明确的处理。

如果线程在执行任务时遇到错误,TaskManager类并没有明确详述任务执行阶段的异常处理。然而,该类在closeTaskTxcloseSchedulerTx等方法中包含了异常处理,它捕获异常,记录日志,并抛出HugeException。这表明了一种捕获异常、记录日志并将其重新抛出为自定义异常类型的模式,这种模式可以类似地应用于任务执行中的错误处理。

shutdown方法负责优雅地关闭TaskManager管理的所有线程池。它试图关闭执行器,并等待当前执行的任务在指定的超时期限内完成。如果任务在超时期限内未完成,它会记录一个错误。

scheduleOrExecuteJob方法定期被调用以调度或执行作业。它遍历所有的TaskScheduler实例,在每个实例上进行同步,以线程安全的方式管理任务的调度和执行。这个方法,以及scheduleOrExecuteJobForGraph,封装了决定是否调度新任务、执行排队的任务或取消任务的逻辑,这取决于服务器的角色(主节点或工作节点)和其他条件。

总的来说,TaskManager类通过使用ExecutorService池来管理线程,其中特定的池专用于任务执行、数据库操作和任务调度。任务完成和错误处理由线程池机制隐式管理,并在某些方法中通过自定义逻辑明确管理。任务的生命周期,包括调度、执行和取消,是通过定期执行和在调度器对象上同步来管理的。

参考源码:

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

能介绍一下TaskManager的具体用途吗,运行过程中何处调用

dosubot[bot] commented 3 months ago

The TaskManager class in HugeGraph's server is indeed a central component for managing and executing background tasks. It is designed to handle various types of tasks, including those related to graph operations and server information management. Here are the specific uses and functionalities provided by the TaskManager class:

  1. Task Scheduling and Execution: The TaskManager initializes different types of executors for handling tasks (taskExecutor, taskDbExecutor, serverInfoDbExecutor, and schedulerExecutor). It schedules tasks for execution and manages their lifecycle. This is evident from the method scheduleOrExecuteJob which iterates over all TaskScheduler instances and calls scheduleOrExecuteJobForGraph for each, indicating that tasks are scheduled or executed periodically.

  2. Task Scheduler Management: It manages TaskScheduler instances for each HugeGraphParams (which likely represents a graph instance). Methods like addScheduler and closeScheduler are used to add and close schedulers, respectively. This indicates that TaskManager is responsible for managing task execution specific to each graph instance within the server.

  3. Server Role Management: The TaskManager supports server role management (master or worker roles) through methods like onAsRoleMaster and onAsRoleWorker. This functionality is important for distributed environments where tasks might be scheduled or executed differently based on the role of the server.

  4. Thread Pool Management: It manages several thread pools for different purposes (taskExecutor, taskDbExecutor, serverInfoDbExecutor, and schedulerExecutor). This includes executing tasks, handling database operations related to tasks, managing server information, and scheduling tasks. The class provides methods to shutdown these executors gracefully (shutdown method) and query their status (e.g., workerPoolSize, pendingTasks).

  5. Transaction Management: The TaskManager includes methods for closing transactions (closeTaskTx, closeSchedulerTx) related to tasks and schedulers. This is crucial for ensuring data consistency and releasing resources properly.

  6. Pause and Resume Functionality: It provides the ability to pause and resume the scheduled task execution (pauseScheduledThreadPool, resumeScheduledThreadPool). This can be useful for maintenance operations or when the server is under heavy load.

  7. Context Management: The class includes a mechanism for managing execution contexts (CONTEXTS ThreadLocal variable) with methods to set, reset, and get the current context. This is likely used to maintain context information across different tasks and threads.

In terms of where the TaskManager class is called during the running process, it would be primarily invoked whenever a new task needs to be scheduled or executed, or when there is a change in server roles. The exact points of invocation would depend on the specific use case and the operations being performed on the graph database.

For more details, you can refer to the source code of the TaskManager class in the HugeGraph server repository: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java

I hope this provides a clearer understanding of the TaskManager class. If you have any more questions or need further clarification, feel free to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
SkrDrag commented 3 months ago

The TaskManager class in HugeGraph's server is indeed a central component for managing and executing background tasks. It is designed to handle various types of tasks, including those related to graph operations and server information management. Here are the specific uses and functionalities provided by the TaskManager class:

  1. Task Scheduling and Execution: The TaskManager initializes different types of executors for handling tasks (taskExecutor, taskDbExecutor, serverInfoDbExecutor, and schedulerExecutor). It schedules tasks for execution and manages their lifecycle. This is evident from the method scheduleOrExecuteJob which iterates over all TaskScheduler instances and calls scheduleOrExecuteJobForGraph for each, indicating that tasks are scheduled or executed periodically.
  2. Task Scheduler Management: It manages TaskScheduler instances for each HugeGraphParams (which likely represents a graph instance). Methods like addScheduler and closeScheduler are used to add and close schedulers, respectively. This indicates that TaskManager is responsible for managing task execution specific to each graph instance within the server.
  3. Server Role Management: The TaskManager supports server role management (master or worker roles) through methods like onAsRoleMaster and onAsRoleWorker. This functionality is important for distributed environments where tasks might be scheduled or executed differently based on the role of the server.
  4. Thread Pool Management: It manages several thread pools for different purposes (taskExecutor, taskDbExecutor, serverInfoDbExecutor, and schedulerExecutor). This includes executing tasks, handling database operations related to tasks, managing server information, and scheduling tasks. The class provides methods to shutdown these executors gracefully (shutdown method) and query their status (e.g., workerPoolSize, pendingTasks).
  5. Transaction Management: The TaskManager includes methods for closing transactions (closeTaskTx, closeSchedulerTx) related to tasks and schedulers. This is crucial for ensuring data consistency and releasing resources properly.
  6. Pause and Resume Functionality: It provides the ability to pause and resume the scheduled task execution (pauseScheduledThreadPool, resumeScheduledThreadPool). This can be useful for maintenance operations or when the server is under heavy load.
  7. Context Management: The class includes a mechanism for managing execution contexts (CONTEXTS ThreadLocal variable) with methods to set, reset, and get the current context. This is likely used to maintain context information across different tasks and threads.

In terms of where the TaskManager class is called during the running process, it would be primarily invoked whenever a new task needs to be scheduled or executed, or when there is a change in server roles. The exact points of invocation would depend on the specific use case and the operations being performed on the graph database.

For more details, you can refer to the source code of the TaskManager class in the HugeGraph server repository: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java

I hope this provides a clearer understanding of the TaskManager class. If you have any more questions or need further clarification, feel free to ask.

Sources

About Dosu

你能给我解释一下,多用户并发访问Restful API时,例如多个kout的api请求同时发送给server。server是如何使用TaskManager来监控请求到达,并创建实例提交到线程池运行的。 具体代码在哪部分? 我目前是想基于hugegraph开发一个适合我司的业务系统,我目前是通过修改API调用自己的算法,我该怎么开发一个自己的任务管理模块,当多个用户同时访问该接口时,server端能够监控并创建实例进行运行。

dosubot[bot] commented 3 months ago

The TaskManager in the incubator-hugegraph repository manages the execution of tasks within a thread pool by encapsulating tasks as instances of HugeTask<V>, which extends FutureTask<V>.

When a RESTful API request that requires asynchronous processing or long-running operations is received, an instance of HugeTask<V> is created. This task encapsulates the operation to be performed. The task is then submitted to a TaskScheduler, which manages a thread pool for executing these tasks. The TaskScheduler controls the concurrency through a thread pool. It schedules tasks for execution based on available threads and task priorities. This ensures that multiple concurrent requests are handled efficiently, with each task running in its own thread within the pool. Tasks can have dependencies (dependencies field in HugeTask<V>), meaning a task can be scheduled to run only after its dependencies have successfully completed. This feature is crucial for maintaining the correct execution order and for tasks that rely on the results of other tasks.

To develop a similar module for handling multiple user interface accesses simultaneously, you can follow these steps:

  1. Create a task class similar to HugeTask<V> that extends FutureTask<V> or implements Runnable or Callable<V>. This class should encapsulate the operation to be performed asynchronously.

  2. Implement a task scheduler similar to TaskScheduler in the provided code. This scheduler should manage a thread pool (e.g., using ExecutorService from Java's concurrency package) for executing tasks. It should also handle task dependencies and ensure tasks are executed in the correct order.

  3. Provide a mechanism for submitting tasks to the scheduler. This could be through a RESTful API or any other interface method. The scheduler should pick up submitted tasks, check for dependencies, and execute them in the thread pool once dependencies are satisfied and threads are available.

  4. Ensure proper management of threads and resources. This includes setting an appropriate size for the thread pool based on your system's capabilities and the expected load.

  5. Implement robust error handling and provide a way to track the status of tasks. This includes handling failed tasks, retrying tasks if necessary, and providing feedback on task progress and results.

Here are some simplified code snippets based on the provided context:

Task Representation:

public class MyTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        // Task logic here
        return "Task Result";
    }
}

Task Submission:

ExecutorService executor = Executors.newFixedThreadPool(10); // Thread pool
Future<String> future = executor.submit(new MyTask());

Task Execution and Management:

// Wait for the task to complete and retrieve the result
String result = future.get();
System.out.println("Task completed with result: " + result);

// Shutdown the executor
executor.shutdown();

This approach provides a foundation for developing a task management module capable of handling multiple concurrent user interface accesses by leveraging Java's concurrency utilities.

The TaskManager in the incubator-hugegraph repository creates instances of HugeTask<V> for each incoming request by directly invoking one of the HugeTask constructors with the necessary parameters. There are two constructors in the HugeTask class that can be used for instantiation:

  1. HugeTask(Id id, Id parent, String callable, String input): This constructor is used when the task is initialized with a specific id for the task, a parent task ID (if any), the class name of the callable to be executed, and an input string. Inside this constructor, it calls another constructor this(id, parent, TaskCallable.fromClass(callable)) and then sets the input with this.input(input).

  2. HugeTask(Id id, Id parent, TaskCallable<V> callable): This constructor is used when the task is initialized with a specific id for the task, a parent task ID (if any), and a TaskCallable<V> instance. It sets up the task with the provided parameters and initializes other properties with default values or nulls. This includes setting the task status to NEW, initializing the creation date, and setting the load to 1, among other initializations.

The choice between these constructors depends on whether the caller has a TaskCallable<V> instance ready or prefers to specify the callable by its class name as a string along with any input required for the task.

In both cases, the HugeTask instance is initialized with a status of NEW, and various other properties are set to their default values or initialized as empty, such as dependencies, description, context, etc. The actual execution logic of the task is encapsulated within the TaskCallable<V> instance provided during the task's creation.

You can find more details in the HugeTask.java file in the repository.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
github-actions[bot] commented 2 months ago

Due to the lack of activity, the current issue is marked as stale and will be closed after 20 days, any update will remove the stale label