flomesh-io / pipy

Pipy is a programmable proxy for the cloud, edge and IoT.
https://flomesh.io/pipy
Other
743 stars 70 forks source link

the execution cycle of the main pipeline and fork pipeline ? #22

Closed sixinyiyu closed 3 years ago

sixinyiyu commented 3 years ago

Version : 0.0.0 Commit : d54c0aaae262756f35ebf8256bd4dae7617b8444 Commit Date : Wed, 9 Jun 2021 13:56:10 +0800 Host : Linux-5.10.18-amd64-desktop x86_64 OpenSSL : OpenSSL 1.1.1g 21 Apr 2020

pipy({
})

.listen(9000)
  .fork('invoke-A')
  .fork('invoke-B')
  .decodeHttpRequest()
  .replaceMessage(
    () => new Message({ status: 200 }, '成功!\n')
  )
  .encodeHttpResponse()
  .onMessage(
    msg => (
      console.log('main service response')
    )
  ).onSessionEnd(
    () => (
      console.log('main service session end')
    )
  )

  .pipeline('invoke-B')
    .decodeHttpRequest()
    .onMessageStart(
      () => console.log('invoke B service ...')
    )
    .replaceMessage(
      () => new Message(
        {
          method: 'POST',
          path: '/A',
          headers: {
            host: '127.0.0.1:8080',
            'Content-Type': 'application/json',
          },
        },
        JSON.encode({
          cmd: 'add',
          module: 'keyworld',
        })
      )
    )
    .encodeHttpRequest()
    .connect('127.0.0.1:8080')
    .print()
    .decodeHttpResponse()
    .print()
    .onMessage(
      msg => (
        console.log('Get B service response' + msg.body)
      )
    )

  .pipeline('invoke-A')
    .decodeHttpRequest()
    .onMessageStart(
      () => console.log('invoke A service ...')
    )
    .replaceMessage(
      () => new Message({
        method: 'GET',
        path: '/hello',
        headers: {
          host: '127.0.0.1:8080',
        },
      })
    )
    .encodeHttpRequest()
    .connect('127.0.0.1:8080')
    .print()
    .decodeHttpResponse()
    .print()
    .onMessage(
      msg => (
        console.log('Get A service response' + msg.body)
      )
    )
---------console------

2021-06-30 10:26:22 [info] [pjs] invoke A service ...
2021-06-30 10:26:22 [info] [pjs] invoke B service ...
2021-06-30 10:26:22 [info] [pjs] main service response
HTTP/1.1 200 
Content-Type: text/plain;charset=UTF-8
Content-Length: 5
Date: Wed, 30 Jun 2021 02:26:22 GMT

Hello
2021-06-30 10:26:22 [info] [pjs] Get A service responseHello
2021-06-30 10:26:22 [info] [pjs] main service session end

the /A endpoint sleep for 5 seconds until retrun  response.

does the main  session end when the B pipeline got the response ? 

I want to know when it will print B resposne ? 

use wait block the code until B got resposne?
pajama-coder commented 3 years ago

Every pipeline, when linked to (or forked to, merged to, muxed to, demuxed to...), will instantiate a new Session that is totally independent to other sessions from other pipelines. So there's no guarantee as to when, in your case, the B pipeline (or let's say session to be more clear) will close up compared to the main listening session. Sessions close only under one condition: that's when it receives a SessionEnd event from its input. In your example, when the client closes the connection, either actively or passively if pipy shuts it down first, session B gets a SessionEnd too because the main session is forking everthing to it, including the SessionEnd. So when the main session closes, A and B both close. To have a session live longer than the main session, link to it by using merge, mux, demux etc. These filters don't pass SessionEnds to the sessions they create.

Hope that could clarify everything. Feel free to follow up with more questions.

sixinyiyu commented 3 years ago

i see, tks!

sixinyiyu commented 3 years ago

just little tips: route could support dsl