为了解决这个问题,我们必须使用非常类似于我们在Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的异步并行执行的模式来追溯_combine()方法的所有正在运行的实例。 当_combine()方法的所有实例都已经完成运行时,触发end事件,通知任何监听器,进程需要做的所有动作都已经完成。
对于最终子集求和算法的重构版本。首先,我们需要将_combine()方法中的递归步骤替换为异步:
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combineInterleaved(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
本文配有demo,猛戳此处访问demo
大量占用
CPU
计算资源的任务称为CPU-bound
的任务。它的主要特点是
CPU
利用率较高,而不是I/O
操作繁重。 让我们立即举一个例子上看看这些类型的任务在Node.js
中的具体行为。暴力求解子集求和问题
问题:类似于Leetcode-40. Combination Sum II
Example:
暴力算法就是一个递归算法,这里主要探究
Node.js
上的解决方案,就不去探究算法本身。构建Demo本身
我们使用
EventEmitter
来构建这个demo:为了说明
CPU-bound
任务造成的问题,创建一个HTTP
服务器,对于网络请求作出响应:由于
SubsetSum
实例使用事件返回结果,所以我们可以在算法生成后立即对匹配的结果使用Stream
进行处理。另一个需要注意的细节是,每次我们的服务器都会返回I'm alive!
,这样我们每次发送一个不同于/subsetSum
的请求的时候。可以用来检查我们服务器是否挂掉了,这在稍后将会看到。开始运行:
一旦服务器启动,我们准备发送我们的第一个请求;让我们尝试发送一组17个随机数,服务器将会一直处于阻塞状态的计算中,那么将会导致服务器处理一段时间:
这是如果我们在第一个请求仍在运行的时候在另一个终端中尝试输入以下命令,我们将发现一个巨大的问题:
方案一:使用setImmediate
现在我们来看看这个模式如何应用于子集求和算法。 我们所要做的只是稍微修改一下
subsetSum.js
模块。 为方便起见,我们将创建一个名为subsetSumDefer.js
的新模块,将原始的subsetSum
类的代码作为起点。 我们要做的第一个改变是添加一个名为_combineInterleaved()
的新方法,它是我们正在实现的模式的核心:正如我们所看到的,我们所要做的只是使用
setImmediate()
调用原始的同步的_combine()
方法。然而,现在的问题是因为该算法不再是同步的,我们更难以知道何时已经完成了所有的组合的计算。为了解决这个问题,我们必须使用非常类似于我们在
Chapter3-Asynchronous Control Flow Patterns with Callbacks
看到的异步并行执行的模式来追溯_combine()
方法的所有正在运行的实例。 当_combine()
方法的所有实例都已经完成运行时,触发end
事件,通知任何监听器,进程需要做的所有动作都已经完成。对于最终子集求和算法的重构版本。首先,我们需要将
_combine()
方法中的递归步骤替换为异步:通过上面的更改,我们确保算法的每个步骤都将使用
setImmediate()
在事件循环中排队,在事件循环队列中I / O
请求之后执行,而不是同步运行造成阻塞。另一个小调整是对于
start()
方法:在前面的代码中,我们将
_combine()
方法的运行实例的数量初始化为0
.我们还通过调用_combineInterleaved()
来将调用替换为_combine()
,并移除了end
的触发,因为现在_combineInterleaved()
是异步处理的。 通过这个最后的改变,我们的子集求和算法现在应该能够通过事件循环可以运行的时间间隔交替地运行其可能大量占用CPU
的代码,并且不会再造成阻塞。最后更新
app.js
模块,以便它可以使用新版本的SubsetSum
:方案二:实现一个进程池
先从构建
processPool.js
模块开始:在模块的第一部分,引入我们将用来创建新进程的
child_process.fork()
函数。 然后,我们定义ProcessPool
的构造函数,该构造函数接受表示要运行的Node.js
程序的文件参数以及池中运行的最大实例数poolMax
作为参数。然后我们定义三个实例变量:pool
表示的是准备运行的进程active
表示的是当前正在运行的进程列表waiting
包含所有这些请求的任务队列,保存由于缺少可用的资源而无法立即实现的任务看
ProcessPool
类的acquire()
方法,它负责取出一个准备好被使用的进程:函数逻辑如下:
active
数组中,然后通过异步的方式调用其回调函数。waiting
数组。child_process.fork()
创建一个新的进程,将其添加到active
列表中,然后调用其回调。ProcessPool
类的最后一个方法是release()
,其目的是将一个进程放回进程池中:前面的代码也很简单,其解释如下:
waiting
任务队列里面有任务需要被执行,我们只需为这个任务分配一个进程worker
执行。waiting
任务队列中都没有需要被执行的任务,我们则把active
的进程列表中的进程放回进程池中。正如我们所看到的,进程从来没有中断,只在为其不断地重新分配任务,使我们可以通过在每个请求不重新启动一个进程达到节省时间和空间的目的。然而,重要的是要注意,这可能并不总是最好的选择,这很大程度上取决于我们的应用程序的要求。为减少进程池长期占用内存,可能的调整如下:
父子进程通信
现在我们的
ProcessPool
类已经准备就绪,我们可以使用它来实现SubsetSumFork
模块,SubsetSumFork
的作用是与子进程进行通信得到子集求和的结果。前面曾说到,用child_process.fork()
启动一个进程也给了我们创建了一个简单的基于消息的管道,通过实现subsetSumFork.js
模块来看看它是如何工作的:首先注意,我们在
subsetSumWorker.js
调用ProcessPool
的构造函数创建ProcessPool
实例。 我们还将进程池的最大容量设置为2
。另外,我们试图维持原来的
SubsetSum
类相同的公共API。实际上,SubsetSumFork
是EventEmitter
的子类,它的构造函数接受sum
和set
,而start()
方法则触发算法的执行,而这个SubsetSumFork
实例运行在一个单独的进程上。调用start()
方法时会发生的情况:sum
和set
。send()
方法是Node.js
自动提供给child_process.fork()
创建的所有进程,这实际上与父子进程之间的通信管道有关。on()
方法附加一个新的事件监听器(这也是所有以child_process.fork()
创建的进程提供的通信通道的一部分)。end
事件,这意味着SubsetSum
所有任务已经完成,在这种情况下,我们删除onMessage
监听器并释放worker
,并将其放回进程池中,不再让其占用内存资源和CPU
资源。worker
以{event,data}
格式生成消息,使得任何时候一旦子进程处理完毕任务,我们在外部都能接收到这一消息。这就是
SubsetSumFork
模块现在我们来实现这个worker
应用程序。与父进程进行通信
现在我们来创建
subsetSumWorker.js
模块,我们的应用程序,这个模块的全部内容将在一个单独的进程中运行:由于我们的
handler
处于一个单独的进程中,我们不必担心这类CPU-bound
任务阻塞事件循环,所有的HTTP
请求将继续由主应用程序的事件循环处理,而不会中断。当子进程开始启动时,父进程:
process.on()
函数轻松实现。我们期望从父进程中唯一的消息是为新的SubsetSum
任务提供输入的消息。只要收到这样的消息,我们创建一个SubsetSum
类的新实例,并注册match
和end
事件监听器。最后,我们用subsetSum.start()
开始计算。{event,data}
的对象中,并将其发送给父进程。这些消息然后在subsetSumFork.js
模块中处理,就像我们在前面的章节中看到的那样。