asdf2014 / gitment

Gitment for My Blog
https://yuzhouwan.com
1 stars 0 forks source link

Presto:分布式 SQL 查询引擎 | 宇宙湾 #119

Open asdf2014 opened 3 years ago

asdf2014 commented 3 years ago

https://yuzhouwan.com/posts/200906/

sjx782392329 commented 3 years ago

博主你好,我对博客里面 4.4 时序交互图里面虚线框的发现服务部分存在一些疑惑。不清楚 Coordinator 和 worker 是怎么发送心跳给发现服务的。 我通过查看 prestosql 版本的源码,PrestoServer 类是入口,发现在 Server 类中的 doStart() 方法加载了 ServerMainModule(),这个里面配置了 CoordinatorModule() 启动单机服务的时候,Coordinator 集成了 Discovery 服务,在 failure detector 注释下,我理解是对节点的失败探测,在 FailureDetectorModule() 中存在一个 HeartbeatFailureDetector 类,里面的 updateMonitoredServices() 方法拿到了所有的节点,并且通过调用注释 4 的 MonitoringTask 的 enable 方法对节点进行 ping() 操作,并且 updateState() 。 上述过程,我理解是 Discovery 服务主动发起的操作,不像是 Coordinator 和 worker 主动发心跳,其次我不知道这个检测结果对后续选 worker 是否有影响,希望博主能够解答一下

asdf2014 commented 3 years ago

@sjx782392329 博主你好,我对博客里面 4.4 时序交互图里面虚线框的发现服务部分存在一些疑惑。不清楚 Coordinator 和 worker 是怎么发送心跳给发现服务的。 我通过查看 prestosql 版本的源码,PrestoServer 类是入口,发现在 Server 类中的 doStart() 方法加载了 ServerMainModule(),这个里面配置了 CoordinatorModule() 启动单机服务的时候,Coordinator 集成了 Discovery 服务,在 failure detector 注释下,我理解是对节点的失败探测,在 FailureDetectorModule() 中存在一个 HeartbeatFailureDetector 类,里面的 updateMonitoredServices() 方法拿到了所有的节点,并且通过调用注释 4 的 MonitoringTask 的 enable 方法对节点进行 ping() 操作,并且 updateState() 。 上述过程,我理解是 Discovery 服务主动发起的操作,不像是 Coordinator 和 worker 主动发心跳,其次我不知道这个检测结果对后续选 worker 是否有影响,希望博主能够解答一下

是的,并不是 Worker 主动发送心跳,而是 Discovery 定时监测节点是否存活。其实,Presto 是基于 Airlift 框架来实现服务发现的,通过 HTTP 协议进行集群通讯。我们在 etc/config.properties 文件中,配置的 discovery.uri 参数就是透传给 Airlift 框架的。假设我们将 discovery.uri 参数设置为 http://127.0.0.1:9999,则可以通过调用 http://127.0.0.1:9999/v1/service 链接,获取到所有注册的服务(类型、ID、通讯地址、服务所在的节点等信息)。完整的信息如下(其中 IP 地址已脱敏):

{
  "environment": "presto",
  "services": [
    {
      "id": "1f538ad9-e4b0-40a2-88a7-8e901b6d8ce6",
      "nodeId": "presto_node_1",
      "type": "presto-coordinator",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999"
      }
    },
    {
      "id": "89a0bf46-a949-4d62-8c65-c6fb9afe1bb2",
      "nodeId": "presto_node_1",
      "type": "discovery",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999"
      }
    },
    {
      "id": "253a93c3-ebe9-46cf-a0ee-41e7de56c620",
      "nodeId": "presto_node_1",
      "type": "presto",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "node_version": "344",
        "coordinator": "true",
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999",
        "connectorIds": "system,druid"
      }
    },
    {
      "id": "0a9970f3-6717-4979-8f28-7f12c7bd7c75",
      "nodeId": "presto_node_1",
      "type": "jmx-http",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999"
      }
    }
  ]
}

具体的,是在 HeartbeatFailureDetector 类中启动一个执行周期为 5s 的定时任务,不断地调用 updateMonitoredServices 方法,来更新集群的服务状态。另外,DiscoveryNodeManager 类中也会启动一个执行周期为 5s 的定时任务,不断地调用 pollWorkers 方法,来检查各个节点的状态。我们可以手动访问 http://localhost:9999/v1/info/state 链接,来查看节点是否健康。如果节点存活,则会返回 "ACTIVE" 字符串。

asdf2014 commented 3 years ago

@sjx782392329 其次我不知道这个检测结果对后续选 worker 是否有影响

Node 的状态主要分为 active、inactive、shuttingDown 三种,以集合的形式保存在了 AllNodes 类中。后续再选择 Worker 的时候会判断是否存活,并通过 AllNodes#getActiveNodes 方法获取到 active 状态的 Node 集合。

sjx782392329 commented 3 years ago

@sjx782392329 博主你好,我对博客里面 4.4 时序交互图里面虚线框的发现服务部分存在一些疑惑。不清楚 Coordinator 和 worker 是怎么发送心跳给发现服务的。 我通过查看 prestosql 版本的源码,PrestoServer 类是入口,发现在 Server 类中的 doStart() 方法加载了 ServerMainModule(),这个里面配置了 CoordinatorModule() 启动单机服务的时候,Coordinator 集成了 Discovery 服务,在 failure detector 注释下,我理解是对节点的失败探测,在 FailureDetectorModule() 中存在一个 HeartbeatFailureDetector 类,里面的 updateMonitoredServices() 方法拿到了所有的节点,并且通过调用注释 4 的 MonitoringTask 的 enable 方法对节点进行 ping() 操作,并且 updateState() 。 上述过程,我理解是 Discovery 服务主动发起的操作,不像是 Coordinator 和 worker 主动发心跳,其次我不知道这个检测结果对后续选 worker 是否有影响,希望博主能够解答一下

是的,并不是 Worker 主动发送心跳,而是 Discovery 定时监测节点是否存活。其实,Presto 是基于 Airlift 框架来实现服务发现的,通过 HTTP 协议进行集群通讯。我们在 etc/config.properties 文件中,配置的 discovery.uri 参数就是透传给 Airlift 框架的。假设我们将 discovery.uri 参数设置为 http://127.0.0.1:9999,则可以通过调用 http://127.0.0.1:9999/v1/service 链接,获取到所有注册的服务(类型、ID、通讯地址、服务所在的节点等信息)。完整的信息如下(其中 IP 地址已脱敏):

{
  "environment": "presto",
  "services": [
    {
      "id": "1f538ad9-e4b0-40a2-88a7-8e901b6d8ce6",
      "nodeId": "presto_node_1",
      "type": "presto-coordinator",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999"
      }
    },
    {
      "id": "89a0bf46-a949-4d62-8c65-c6fb9afe1bb2",
      "nodeId": "presto_node_1",
      "type": "discovery",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999"
      }
    },
    {
      "id": "253a93c3-ebe9-46cf-a0ee-41e7de56c620",
      "nodeId": "presto_node_1",
      "type": "presto",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "node_version": "344",
        "coordinator": "true",
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999",
        "connectorIds": "system,druid"
      }
    },
    {
      "id": "0a9970f3-6717-4979-8f28-7f12c7bd7c75",
      "nodeId": "presto_node_1",
      "type": "jmx-http",
      "pool": "general",
      "location": "/presto_node_1",
      "properties": {
        "http": "http://127.0.0.1:9999",
        "http-external": "http://127.0.0.1:9999"
      }
    }
  ]
}

具体的,是在 HeartbeatFailureDetector 类中启动一个执行周期为 5s 的定时任务,不断地调用 updateMonitoredServices 方法,来更新集群的服务状态。另外,DiscoveryNodeManager 类中也会启动一个执行周期为 5s 的定时任务,不断地调用 pollWorkers 方法,来检查各个节点的状态。我们可以手动访问 http://localhost:9999/v1/info/state 链接,来查看节点是否健康。如果节点存活,则会返回 "ACTIVE" 字符串。

我看这个调用 updateMonitoredServices 方法,来更新集群的服务状态时,只记录了 successTransitionTimestamp 这个属性。我想知道这个属性和后面调用 DiscoveryNodeManager 类中的 pollWorkers中获取到的状态Active 是否相关。

sjx782392329 commented 3 years ago

我已经找到了,在 refreshNodesInternal 方法里面的 getFailed 方法用到了 successTransitionTimestamp 这个属性,我查看源码的能力还有待提高

asdf2014 commented 3 years ago

@sjx782392329 :+1:

asdf2014 commented 3 years ago

@sjx782392329 多谢提了一个很好的问题,刚我梳理了一下,添加到正文啦,详见:https://yuzhouwan.com/posts/200906/#Service-Discovery

zhangxiading1982 commented 3 years ago

博主你好,"Presto:分布式 SQL 查询引擎" 中 "查询模型"中写了 Stage、Task、Split的关系,其中Stage在状态机 SEDA(stage event driver architecture)的定义中理解起来很清晰,F1 设计中更多提到Fragment这个基础概念,想听大神 如何在看待 Fragment 和 Stage之间的关系。

“Stage包含了一系列Task“,感觉有些模糊,博主是否可以把Task类型、 Stage和Task的包含关系 在文中进一步说明下。

asdf2014 commented 3 years ago

@zhangxiading1982 博主你好,"Presto:分布式 SQL 查询引擎" 中 "查询模型"中写了 Stage、Task、Split的关系,其中Stage在状态机 SEDA(stage event driver architecture)的定义中理解起来很清晰,F1 设计中更多提到Fragment这个基础概念,想听大神 如何在看待 Fragment 和 Stage之间的关系。

“Stage包含了一系列Task“,感觉有些模糊,博主是否可以把Task类型、 Stage和Task的包含关系 在文中进一步说明下。

你好,这个问题很赞呀 :+1: 不是大神哈,我简单说一下个人的理解吧。是这样的,类似 SEDA 的 Stage 和 F1 的 Fragment 概念,其实在很多引擎中都是存在的,本质上还是为了方便在分布式集群中做执行调度。在 Presto 中,同样也存在着 SubPlan、Fragment、Stage、Task 这些概念。总的来说,它们的对应关系是 SubPlan : Stage : Fragment : Task = 1 : 1 : 1 : n,即前三个均是一一对应的关系,而 Stage 则会包含多个 Task。其中,SubPlan 是根据 SQL 语义分析而拆分出来的逻辑查询计划。进一步依据算子,可以将 SubPlan 拆分一系列 PlanNode。再把这一系列的 PlanNode 封装起来就是 Fragment。然后,将 Fragment 调度到真正执行的 Worker 时,就形成了 Stage。这里的 Stage 则包含了一个或者多个 Task,每一个 Task 处理整个数据集的一小部分数据。另外,Task 的类型有很多种,可以看下 DataDefinitionTask 这个接口的具体实现,这里就不展开了。