wechaty / puppet-service

Wechaty Puppet Provider for providing/consuming the Wechaty Puppet Service
https://paka.dev/npm/wechaty-puppet-service
Apache License 2.0
74 stars 18 forks source link

Use RxJS to deal with the grpcStart/grpcStop/grpcEvent/grpcReset etc. #20

Open huan opened 4 years ago

huan commented 4 years ago

with a backoff retry strategy.

See: https://www.learnrxjs.io/learn-rxjs/operators/error_handling/retrywhen

First Version of the RxJS Solution

/**
 * Filters
 */
export const switchSuccess = (status: true | 'pending') => status === true

/**
 * Actions
 */
export const resetPuppet   = (puppet: Puppet) => () => puppet.emit('reset', { data: 'RxJS recover$' })
export const dingHeartbeat = (puppet: Puppet) => () => puppet.ding(`recover$() AED`)  // AED: Automated External Defibrillator

/**
 * Observables
 */
export const switchOn$  = (puppet: Puppet) => fromEvent(puppet.state, 'on')
export const switchOff$ = (puppet: Puppet) => fromEvent(puppet.state, 'off')
export const heartbeat$ = (puppet: Puppet) => fromEvent<{}>(puppet, 'heartbeat')

/**
 * Streams
 */

// Heartbeat stream is like ECG (ElectroCardioGraphy)
export const switchOnHeartbeat$ = (puppet: Puppet) => switchOn$(puppet).pipe(
  filter(switchSuccess),
  tap(_ => log.verbose('Puppet', 'recover$() switchOn$ fired')),
  switchMap(_ => heartbeat$(puppet).pipe(
    startWith(undefined), // initial beat
    tap(payload => log.verbose('Puppet', 'recover$() heartbeat: %s', JSON.stringify(payload))),
  ))
)

// Ding is like CPR (Cardio Pulmonary Resuscitation)
export const heartbeatDing = (puppet: Puppet) => switchOnHeartbeat$(puppet).pipe(
  debounce(() => interval(15 * 1000)),
  tap(_ => log.verbose('Puppet', 'recover$() heartbeatDing()')),
  tap(dingHeartbeat(puppet)),
)

// Reset is like AED (Automated External Defibrillator)
export const heartbeatReset = (puppet: Puppet) => switchOnHeartbeat$(puppet).pipe(
  debounce(_ => interval(60 * 1000)),
  tap(_ => log.verbose('Puppet', 'recover$() heartbeatReset()')),
  switchMap(_ => interval(60 * 1000).pipe(
    tap(resetPuppet(puppet)),
    takeUntil(heartbeat$(puppet)),
  )),
)

/**
 * Main stream
 */
export const recover$ = (puppet: Puppet) => merge(
  heartbeatDing(puppet),
  heartbeatReset(puppet),
)
huan commented 4 years ago

The current behavior has bugs that caused the reconnect will only retry once after the server died.

00:30:21 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"
00:30:21 VERB Puppet reset(startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed)
00:30:21 SILL StateSwitch <PuppetHostie> off() is false
00:30:21 VERB PuppetHostie stop()
00:30:21 SILL StateSwitch <PuppetHostie> off() is false
00:30:21 SILL StateSwitch <PuppetHostie> off() is false
00:30:21 VERB StateSwitch <PuppetHostie> off(pending) <- (false)
00:30:21 VERB Puppet selfId()
00:30:21 SILL Contact ready() @ Puppet#0<PuppetHostie>(Mike BO) with id="wxid_a8d806dzznm822"
00:30:21 SILL Contact ready() isReady() true
00:30:21 VERB PuppetHostie stopGrpcStream()
00:30:21 INFO Chatops roomMessage(17376996519@chatroom, πŸ˜ͺ Mike BO)
00:30:21 ERR Chatops roomMessage() this.bot is offline
00:30:21 VERB ContactSelf name()
00:30:21 ERR Wechaty addListenerFunction(logout) listener exception: TypeError: Cannot read property 'name' of undefined
00:30:21 SILL HotImport initProxyModule() proxyModule()
00:30:21 SILL HotImport initProxyModule() proxyModule() using default export
00:30:21 ERR on-error onError(TypeError: Cannot read property 'name' of undefined)
00:30:21 INFO Finis startFinis() bot Contact<Huan LI (ζŽε“ζ‘“)> logout
00:30:21 SILL HotImport initProxyModule() proxyModule()
00:30:21 SILL HotImport initProxyModule() proxyModule() using default export
onLogout() enter
00:30:21 INFO on-logout onLogout(Contact<Huan LI (ζŽε“ζ‘“)>)
onLogout() exit
00:30:41 ERR PuppetHostie stop() this.grpcClient.stop() rejection: 14 UNAVAILABLE: failed to connect to all addresses
00:30:41 VERB PuppetHostie stopGrpcClient()
00:30:41 SILL StateSwitch <PuppetHostie> off() is pending
00:30:41 VERB StateSwitch <PuppetHostie> off(true) <- (pending)
00:30:41 VERB PuppetHostie start()
00:30:41 SILL StateSwitch <PuppetHostie> on() is false
00:30:41 SILL StateSwitch <PuppetHostie> on() is false
00:30:41 VERB StateSwitch <PuppetHostie> on(pending) <- (false)
00:30:41 VERB PuppetHostie startGrpcClient()
00:30:41 VERB PuppetHostie discoverHostieIp(windows)
00:30:42 VERB PuppetHostie discoverHostieIp() 0.0.0.0
00:30:42 ERR PuppetHostie start() rejection: no endpoint
00:30:42 SILL StateSwitch <PuppetHostie> off() is false
00:30:42 VERB StateSwitch <PuppetHostie> off(true) <- (false)
00:30:42 WARN Puppet reset() exception: Error: no endpoint
00:30:42 SILL HotImport initProxyModule() proxyModule()
00:30:42 SILL HotImport initProxyModule() proxyModule() using default export
00:30:42 ERR on-error onError(Error)
00:31:10 WARN Puppet constructor() watchdog.on(reset) reason: {"data":"onGrpcStreamEvent(EVENT_TYPE_MESSAGE)","timeout":60000}
00:31:10 SILL Puppet constructor() this.on(reset) payload: "{"data":"onGrpcStreamEvent(EVENT_TYPE_MESSAGE)","timeout":60000}"
00:31:10 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "onGrpcStreamEvent(EVENT_TYPE_MESSAGE)"
00:31:10 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_MESSAGE))
00:31:10 SILL StateSwitch <PuppetHostie> off() is true
00:31:10 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_MESSAGE)) state is off(), make the watchdog to sleep
01:20:56 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
01:20:56 ERR Chatops roomMessage() this.bot is offline
01:20:59 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
02:20:56 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
02:20:56 ERR Chatops roomMessage() this.bot is offline
02:20:59 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
huan commented 4 years ago
22:34:33 SILL Wechaty memoryCheck() free: 1208 MB, require: 4 MB
22:34:48 SILL PuppetHostie ding(no heartbeat for 15 seconds?)
22:34:56 VERB PuppetHostie startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed
22:34:56 SILL Puppet constructor() this.on(reset) payload: "{"data":"startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"}"
22:34:56 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"
22:34:56 VERB Puppet reset(startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed)
22:34:56 SILL StateSwitch <PuppetHostie> off() is false
22:34:56 VERB PuppetHostie stop()
22:34:56 SILL StateSwitch <PuppetHostie> off() is false
22:34:56 SILL StateSwitch <PuppetHostie> off() is false
22:34:56 VERB StateSwitch <PuppetHostie> off(pending) <- (false)
22:34:56 VERB Puppet selfId()
22:34:56 SILL Contact ready() @ Puppet#0<PuppetHostie>(Mike BO) with id="wxid_a8d806dzznm822"
22:34:56 SILL Contact ready() isReady() true
22:34:56 VERB PuppetHostie stopGrpcStream()
22:34:56 INFO Chatops roomMessage(17376996519@chatroom, πŸ˜ͺ Mike BO)
22:34:56 ERR Chatops roomMessage() this.bot is offline
22:34:56 VERB ContactSelf name()
22:34:56 ERR Wechaty addListenerFunction(logout) listener exception: TypeError: Cannot read property 'name' of undefined
22:34:56 SILL HotImport initProxyModule() proxyModule()
22:34:56 SILL HotImport initProxyModule() proxyModule() using default export
22:34:56 ERR on-error onError(TypeError: Cannot read property 'name' of undefined)
22:34:56 INFO Finis startFinis() bot Contact<Huan LI (ζŽε“ζ‘“)> logout
22:34:56 SILL HotImport initProxyModule() proxyModule()
22:34:56 SILL HotImport initProxyModule() proxyModule() using default export
onLogout() enter
22:34:56 INFO on-logout onLogout(Contact<Huan LI (ζŽε“ζ‘“)>)
onLogout() exit
22:34:56 ERR PuppetHostie ding() rejection: Error: 14 UNAVAILABLE: TCP Read failed
22:35:16 ERR PuppetHostie stop() this.grpcClient.stop() rejection: 14 UNAVAILABLE: failed to connect to all addresses
22:35:16 VERB PuppetHostie stopGrpcClient()
22:35:16 SILL StateSwitch <PuppetHostie> off() is pending
22:35:16 VERB StateSwitch <PuppetHostie> off(true) <- (pending)
22:35:16 VERB PuppetHostie start()
22:35:16 SILL StateSwitch <PuppetHostie> on() is false
22:35:16 SILL StateSwitch <PuppetHostie> on() is false
22:35:16 VERB StateSwitch <PuppetHostie> on(pending) <- (false)
22:35:16 VERB PuppetHostie startGrpcClient()
22:35:16 VERB PuppetHostie discoverHostieIp(windows)
22:35:17 VERB PuppetHostie discoverHostieIp() 0.0.0.0
22:35:17 ERR PuppetHostie start() rejection: no endpoint
22:35:17 SILL StateSwitch <PuppetHostie> off() is false
22:35:17 VERB StateSwitch <PuppetHostie> off(true) <- (false)
22:35:17 WARN Puppet reset() exception: Error: no endpoint
22:35:17 SILL HotImport initProxyModule() proxyModule()
22:35:17 SILL HotImport initProxyModule() proxyModule() using default export
22:35:17 ERR on-error onError(Error)
22:35:33 WARN Puppet constructor() watchdog.on(reset) reason: {"data":"onGrpcStreamEvent(EVENT_TYPE_DONG)","timeout":60000}
22:35:33 SILL Puppet constructor() this.on(reset) payload: "{"data":"onGrpcStreamEvent(EVENT_TYPE_DONG)","timeout":60000}"
22:35:33 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "onGrpcStreamEvent(EVENT_TYPE_DONG)"
22:35:33 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_DONG))
22:35:33 SILL StateSwitch <PuppetHostie> off() is true
22:35:33 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_DONG)) state is off(), make the watchdog to sleep
23:28:09 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
23:28:09 ERR Chatops roomMessage() this.bot is offline
23:28:12 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
00:28:09 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
00:28:09 ERR Chatops roomMessage() this.bot is offline
00:28:12 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
01:28:09 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
01:28:09 ERR Chatops roomMessage() this.bot is offline
huan commented 4 years ago
04:15:13 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"
04:15:13 VERB Puppet reset(startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed)
04:15:13 SILL StateSwitch <PuppetHostie> off() is false
04:15:13 VERB PuppetHostie stop()
04:15:13 SILL StateSwitch <PuppetHostie> off() is false
04:15:13 SILL StateSwitch <PuppetHostie> off() is false
04:15:13 VERB StateSwitch <PuppetHostie> off(pending) <- (false)
04:15:13 VERB Puppet selfId()
04:15:13 SILL Contact ready() @ Puppet#0<PuppetHostie>(Mike BO) with id="wxid_a8d806dzznm822"
04:15:13 SILL Contact ready() isReady() true
04:15:13 VERB PuppetHostie stopGrpcStream()
04:15:13 INFO Chatops roomMessage(17376996519@chatroom, πŸ˜ͺ Mike BO)
04:15:13 ERR Chatops roomMessage() this.bot is offline
04:15:13 VERB ContactSelf name()
04:15:13 ERR Wechaty addListenerFunction(logout) listener exception: TypeError: Cannot read property 'name' of undefined
04:15:13 SILL HotImport initProxyModule() proxyModule()
04:15:13 SILL HotImport initProxyModule() proxyModule() using default export
04:15:13 ERR on-error onError(TypeError: Cannot read property 'name' of undefined)
04:15:13 INFO Finis startFinis() bot Contact<Huan LI (ζŽε“ζ‘“)> logout
04:15:13 SILL HotImport initProxyModule() proxyModule()
04:15:13 SILL HotImport initProxyModule() proxyModule() using default export
onLogout() enter
04:15:13 INFO on-logout onLogout(Contact<Huan LI (ζŽε“ζ‘“)>)
onLogout() exit
04:15:33 ERR PuppetHostie stop() this.grpcClient.stop() rejection: 14 UNAVAILABLE: failed to connect to all addresses
04:15:33 VERB PuppetHostie stopGrpcClient()
04:15:33 SILL StateSwitch <PuppetHostie> off() is pending
04:15:33 VERB StateSwitch <PuppetHostie> off(true) <- (pending)
04:15:33 VERB PuppetHostie start()
04:15:33 SILL StateSwitch <PuppetHostie> on() is false
04:15:33 SILL StateSwitch <PuppetHostie> on() is false
04:15:33 VERB StateSwitch <PuppetHostie> on(pending) <- (false)
04:15:33 VERB PuppetHostie startGrpcClient()
04:15:33 VERB PuppetHostie discoverHostieIp(windows)
04:15:34 VERB PuppetHostie discoverHostieIp() 0.0.0.0
04:15:34 ERR PuppetHostie start() rejection: no endpoint
04:15:34 SILL StateSwitch <PuppetHostie> off() is false
04:15:34 VERB StateSwitch <PuppetHostie> off(true) <- (false)
04:15:34 WARN Puppet reset() exception: Error: no endpoint
04:15:34 SILL HotImport initProxyModule() proxyModule()
04:15:34 SILL HotImport initProxyModule() proxyModule() using default export
04:15:34 ERR on-error onError(Error)

04:16:11 WARN Puppet constructor() watchdog.on(reset) reason: {"data":"onGrpcStreamEvent(EVENT_TYPE_DONG)","timeout":60000}
04:16:11 SILL Puppet constructor() this.on(reset) payload: "{"data":"onGrpcStreamEvent(EVENT_TYPE_DONG)","timeout":60000}"
04:16:11 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "onGrpcStreamEvent(EVENT_TYPE_DONG)"
04:16:11 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_DONG))
04:16:11 SILL StateSwitch <PuppetHostie> off() is true
04:16:11 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_DONG)) state is off(), make the watchdog to sleep
huan commented 4 years ago
00:25:31 VERB StateSwitch <PuppetHostie> off(pending) <- (false)
00:25:31 VERB Puppet selfId()
00:25:31 SILL Contact ready() @ Puppet#0<PuppetHostie>(Mike BO) with id="wxid_a8d806dzznm822"
00:25:31 SILL Contact ready() isReady() true
00:25:31 VERB PuppetHostie stopGrpcStream()
00:25:31 INFO Chatops roomMessage(17376996519@chatroom, πŸ˜ͺ Mike BO)
00:25:31 ERR Chatops roomMessage() this.bot is offline
00:25:31 VERB ContactSelf name()
00:25:31 ERR Wechaty addListenerFunction(logout) listener exception: TypeError: Cannot read property 'name' of undefined
00:25:31 SILL HotImport initProxyModule() proxyModule()
00:25:31 SILL HotImport initProxyModule() proxyModule() using default export
00:25:31 ERR on-error onError(TypeError: Cannot read property 'name' of undefined)
00:25:31 INFO Finis startFinis() bot Contact<Huan LI (ζŽε“ζ‘“)> logout
00:25:31 SILL HotImport initProxyModule() proxyModule()
00:25:31 SILL HotImport initProxyModule() proxyModule() using default export
onLogout() enter
00:25:31 INFO on-logout onLogout(Contact<Huan LI (ζŽε“ζ‘“)>)
onLogout() exit
00:25:51 ERR PuppetHostie stop() this.grpcClient.stop() rejection: 14 UNAVAILABLE: failed to connect to all addresses
00:25:51 VERB PuppetHostie stopGrpcClient()
00:25:51 SILL StateSwitch <PuppetHostie> off() is pending
00:25:51 VERB StateSwitch <PuppetHostie> off(true) <- (pending)
00:25:51 VERB PuppetHostie start()
00:25:51 SILL StateSwitch <PuppetHostie> on() is false
00:25:51 SILL StateSwitch <PuppetHostie> on() is false
00:25:51 VERB StateSwitch <PuppetHostie> on(pending) <- (false)
00:25:51 VERB PuppetHostie startGrpcClient()
00:25:51 VERB PuppetHostie discoverHostieIp(windows)
00:25:52 VERB PuppetHostie discoverHostieIp() 0.0.0.0
00:25:52 ERR PuppetHostie start() rejection: no endpoint
00:25:52 SILL StateSwitch <PuppetHostie> off() is false
00:25:52 VERB StateSwitch <PuppetHostie> off(true) <- (false)
00:25:52 WARN Puppet reset() exception: Error: no endpoint
00:25:52 SILL HotImport initProxyModule() proxyModule()
00:25:52 SILL HotImport initProxyModule() proxyModule() using default export
00:25:52 ERR on-error onError(Error)
00:26:11 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
00:26:11 ERR Chatops roomMessage() this.bot is offline
00:26:15 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
00:26:26 WARN Puppet constructor() watchdog.on(reset) reason: {"data":"onGrpcStreamEvent(EVENT_TYPE_DONG)","timeout":60000}
00:26:26 SILL Puppet constructor() this.on(reset) payload: "{"data":"onGrpcStreamEvent(EVENT_TYPE_DONG)","timeout":60000}"
00:26:26 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "onGrpcStreamEvent(EVENT_TYPE_DONG)"
00:26:26 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_DONG))
00:26:26 SILL StateSwitch <PuppetHostie> off() is true
00:26:26 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_DONG)) state is off(), make the watchdog to sleep
01:26:11 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
01:26:11 ERR Chatops roomMessage() this.bot is offline
01:26:15 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
02:26:12 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
02:26:12 ERR Chatops roomMessage() this.bot is offline
02:26:15 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
huan commented 4 years ago
05:37:59 INFO on-message onMessage(Message#Text[πŸ—£Contact<OssChat>@πŸ‘₯Room<ChatOps - Heartbeat πŸ’–>]       πŸ’–)
05:37:59 VERB Room topic()
05:37:59 SILL VoteManager validVote() not a managed room
05:37:59 VERB Puppet selfId()
05:37:59 VERB Message mentionList()
05:37:59 VERB Puppet selfId()
05:37:59 INFO on-message dingDong()
05:37:59 VERB Puppet selfId()
05:37:59 VERB Message mentionList()
05:37:59 VERB Puppet selfId()
05:38:08 VERB PuppetHostie startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed
05:38:08 SILL Puppet constructor() this.on(reset) payload: "{"data":"startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"}"
05:38:08 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"
05:38:08 VERB Puppet reset(startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed)
05:38:08 SILL StateSwitch <PuppetHostie> off() is false
05:38:08 VERB PuppetHostie stop()
05:38:08 SILL StateSwitch <PuppetHostie> off() is false
05:38:08 SILL StateSwitch <PuppetHostie> off() is false
05:38:08 VERB StateSwitch <PuppetHostie> off(pending) <- (false)
05:38:08 VERB Puppet selfId()
05:38:08 SILL Contact ready() @ Puppet#0<PuppetHostie>(Mike BO) with id="wxid_a8d806dzznm822"
05:38:08 SILL Contact ready() isReady() true
05:38:08 VERB PuppetHostie stopGrpcStream()
05:38:08 INFO Chatops roomMessage(17376996519@chatroom, πŸ˜ͺ Mike BO)
05:38:08 ERR Chatops roomMessage() this.bot is offline
05:38:08 VERB ContactSelf name()
05:38:08 ERR Wechaty addListenerFunction(logout) listener exception: TypeError: Cannot read property 'name' of undefined
05:38:08 SILL HotImport initProxyModule() proxyModule()
05:38:08 SILL HotImport initProxyModule() proxyModule() using default export
05:38:08 ERR on-error onError(TypeError: Cannot read property 'name' of undefined)
05:38:08 INFO Finis startFinis() bot Contact<Huan LI (ζŽε“ζ‘“)> logout
05:38:08 SILL HotImport initProxyModule() proxyModule()
05:38:08 SILL HotImport initProxyModule() proxyModule() using default export
onLogout() enter
05:38:08 INFO on-logout onLogout(Contact<Huan LI (ζŽε“ζ‘“)>)
onLogout() exit
05:38:28 ERR PuppetHostie stop() this.grpcClient.stop() rejection: 14 UNAVAILABLE: failed to connect to all addresses
05:38:28 VERB PuppetHostie stopGrpcClient()
05:38:28 SILL StateSwitch <PuppetHostie> off() is pending
05:38:28 VERB StateSwitch <PuppetHostie> off(true) <- (pending)
05:38:28 VERB PuppetHostie start()
05:38:28 SILL StateSwitch <PuppetHostie> on() is false
05:38:28 SILL StateSwitch <PuppetHostie> on() is false
05:38:28 VERB StateSwitch <PuppetHostie> on(pending) <- (false)
05:38:28 VERB PuppetHostie startGrpcClient()
05:38:28 VERB PuppetHostie discoverHostieIp(windows)
05:38:29 VERB PuppetHostie discoverHostieIp() 0.0.0.0
05:38:29 ERR PuppetHostie start() rejection: no endpoint
05:38:29 SILL StateSwitch <PuppetHostie> off() is false
05:38:29 VERB StateSwitch <PuppetHostie> off(true) <- (false)
05:38:29 WARN Puppet reset() exception: Error: no endpoint
05:38:29 SILL HotImport initProxyModule() proxyModule()
05:38:29 SILL HotImport initProxyModule() proxyModule() using default export
05:38:29 ERR on-error onError(Error)
05:38:59 WARN Puppet constructor() watchdog.on(reset) reason: {"data":"onGrpcStreamEvent(EVENT_TYPE_MESSAGE)","timeout":60000}
05:38:59 SILL Puppet constructor() this.on(reset) payload: "{"data":"onGrpcStreamEvent(EVENT_TYPE_MESSAGE)","timeout":60000}"
05:38:59 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "onGrpcStreamEvent(EVENT_TYPE_MESSAGE)"
05:38:59 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_MESSAGE))
05:38:59 SILL StateSwitch <PuppetHostie> off() is true
05:38:59 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_MESSAGE)) state is off(), make the watchdog to sleep
06:25:58 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
06:25:58 ERR Chatops roomMessage() this.bot is offline
06:26:01 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
07:25:58 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
07:25:58 ERR Chatops roomMessage() this.bot is offline
07:26:01 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
08:25:58 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
08:25:58 ERR Chatops roomMessage() this.bot is offline
08:26:01 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
09:25:59 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
09:25:59 ERR Chatops roomMessage() this.bot is offline
09:26:01 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
10:25:59 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
10:25:59 ERR Chatops roomMessage() this.bot is offline
10:26:01 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
11:25:59 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
11:25:59 ERR Chatops roomMessage() this.bot is offline
11:26:02 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
12:25:59 INFO Chatops roomMessage(17376996519@chatroom, πŸ’–)
12:25:59 ERR Chatops roomMessage() this.bot is offline
12:26:02 SILL Wechaty start() setInterval() this timer is to keep Wechaty running...
huan commented 4 years ago
11:56:25 VERB Puppet selfId()
11:56:25 VERB Message mentionList()
11:56:25 VERB Puppet selfId()
11:56:39 VERB PuppetHostie startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed
11:56:39 SILL Puppet constructor() this.on(reset) payload: "{"data":"startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"}"
11:56:39 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed"
11:56:39 VERB Puppet reset(startGrpcStream() eventStream.on(error) Error: 14 UNAVAILABLE: TCP Read failed)
11:56:39 SILL StateSwitch <PuppetHostie> off() is false
11:56:39 VERB PuppetHostie stop()
11:56:39 SILL StateSwitch <PuppetHostie> off() is false
11:56:39 SILL StateSwitch <PuppetHostie> off() is false
11:56:39 VERB StateSwitch <PuppetHostie> off(pending) <- (false)
11:56:39 VERB Puppet selfId()
11:56:39 SILL Contact ready() @ Puppet#0<PuppetHostie>(Mike BO) with id="wxid_a8d806dzznm822"
11:56:39 SILL Contact ready() isReady() true
11:56:39 VERB PuppetHostie stopGrpcStream()
11:56:39 INFO Chatops roomMessage(17376996519@chatroom, πŸ˜ͺ Mike BO)
11:56:39 ERR Chatops roomMessage() this.bot is offline
11:56:39 VERB ContactSelf name()
11:56:39 ERR Wechaty addListenerFunction(logout) listener exception: TypeError: Cannot read property 'name' of undefined
11:56:39 SILL HotImport initProxyModule() proxyModule()
11:56:39 SILL HotImport initProxyModule() proxyModule() using default export
11:56:39 ERR on-error onError(TypeError: Cannot read property 'name' of undefined)
11:56:39 INFO Finis startFinis() bot Contact<Huan LI (ζŽε“ζ‘“)> logout
11:56:39 SILL HotImport initProxyModule() proxyModule()
11:56:39 SILL HotImport initProxyModule() proxyModule() using default export
onLogout() enter
11:56:39 INFO on-logout onLogout(Contact<Huan LI (ζŽε“ζ‘“)>)
onLogout() exit
11:56:59 ERR PuppetHostie stop() this.grpcClient.stop() rejection: 14 UNAVAILABLE: failed to connect to all addresses
11:56:59 VERB PuppetHostie stopGrpcClient()
11:56:59 SILL StateSwitch <PuppetHostie> off() is pending
11:56:59 VERB StateSwitch <PuppetHostie> off(true) <- (pending)
11:56:59 VERB PuppetHostie start()
11:56:59 SILL StateSwitch <PuppetHostie> on() is false
11:56:59 SILL StateSwitch <PuppetHostie> on() is false
11:56:59 VERB StateSwitch <PuppetHostie> on(pending) <- (false)
11:56:59 VERB PuppetHostie startGrpcClient()
11:56:59 VERB PuppetHostie discoverHostieIp(windows)
11:57:01 VERB PuppetHostie discoverHostieIp() 0.0.0.0
11:57:01 ERR PuppetHostie start() rejection: no endpoint
11:57:01 SILL StateSwitch <PuppetHostie> off() is false
11:57:01 VERB StateSwitch <PuppetHostie> off(true) <- (false)
11:57:01 WARN Puppet reset() exception: Error: no endpoint
11:57:01 SILL HotImport initProxyModule() proxyModule()
11:57:01 SILL HotImport initProxyModule() proxyModule() using default export
11:57:01 ERR on-error onError(Error)
11:57:25 WARN Puppet constructor() watchdog.on(reset) reason: {"data":"onGrpcStreamEvent(EVENT_TYPE_MESSAGE)","timeout":60000}
11:57:25 SILL Puppet constructor() this.on(reset) payload: "{"data":"onGrpcStreamEvent(EVENT_TYPE_MESSAGE)","timeout":60000}"
11:57:25 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "onGrpcStreamEvent(EVENT_TYPE_MESSAGE)"
11:57:25 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_MESSAGE))
11:57:25 SILL StateSwitch <PuppetHostie> off() is true
11:57:25 VERB Puppet reset(onGrpcStreamEvent(EVENT_TYPE_MESSAGE)) state is off(), make the watchdog to sleep
lhr0909 commented 4 years ago

https://github.com/wechaty/wechaty-puppet-hostie/blob/master/src/client/recover%24.ts#L62 These ones feels to me throttle might make more sense, because debounce delays the firing timing, while throttle is more fixed, feels more reliable.

https://rxjs-dev.firebaseapp.com/api/operators/throttle

https://rxjs-dev.firebaseapp.com/api/operators/debounce

But I think I will need to play with the lib to be able to tell what exactly we are trying to achieve.

huan commented 4 years ago

Thanks for your suggestion!

What we are trying to archive, is a reconnect & recovery function, to be able to reset our puppet when there's no heartbeat anymore.

I use debounce on purpose because the downstream only needs to react when there's no heartbeat for more than a period of time, which means I don't care about whether there has any heartbeat at the start, I only care about when there's no heartbeat for a while.

In the other hand, I have a headache for a few days for the following code:

https://github.com/wechaty/wechaty-puppet-hostie/blob/05e2d9e9d73a8a957ff8479e3bca3f996531c34f/src/client/recover%24.ts#L81-L84

In the above code, we created two subscriptions from the original heartbeat observable, which I feel should be better if we can only subscribe to one upstream observable.

However, I can not find any method to archive that, so I have to create two separate subscriptions and then merge them.

Is there any way to archive that? Any suggestions would be appreciated!

lhr0909 commented 4 years ago

Thanks for your suggestion!

What we are trying to archive, is a reconnect & recovery function, to be able to reset our puppet when there's no heartbeat anymore.

I use debounce on purpose because the downstream only needs to react when there's no heartbeat for more than a period of time, which means I don't care about whether there has any heartbeat at the start, I only care about when there's no heartbeat for a while.

In the other hand, I have a headache for a few days for the following code:

https://github.com/wechaty/wechaty-puppet-hostie/blob/05e2d9e9d73a8a957ff8479e3bca3f996531c34f/src/client/recover%24.ts#L81-L84

In the above code, we created two subscriptions from the original heartbeat observable, which I feel should be better if we can only subscribe to one upstream observable.

However, I can not find any method to archive that, so I have to create two separate subscriptions and then merge them.

Is there any way to archive that? Any suggestions would be appreciated!

merge is fine here. Since either signal is valid for recover$ so merging the two is good here.

huan commented 4 years ago
06:45:35 VERB PuppetHostie constructor() recover$().subscribe() next(2939)
06:45:35 VERB PuppetHostie stop()
06:45:35 SILL StateSwitch <PuppetHostie> off() is pending
06:45:35 WARN PuppetHostie stop() is called on a OFF puppet. await ready(off) and return.
06:45:35 VERB StateSwitch <PuppetHostie> ready(off, false)
06:46:35 SILL Puppet constructor() this.on(reset) payload: "{"data":"RxJS recover$"}"
06:46:35 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "RxJS recover$"
06:46:35 VERB Puppet reset(RxJS recover$)
06:46:35 VERB PuppetHostie constructor() recover$().subscribe() next(2940)
06:46:35 VERB PuppetHostie stop()
06:46:35 SILL StateSwitch <PuppetHostie> off() is pending
06:46:35 WARN PuppetHostie stop() is called on a OFF puppet. await ready(off) and return.
06:46:35 VERB StateSwitch <PuppetHostie> ready(off, false)
06:47:35 SILL Puppet constructor() this.on(reset) payload: "{"data":"RxJS recover$"}"
06:47:35 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "RxJS recover$"
06:47:35 VERB Puppet reset(RxJS recover$)
06:47:35 VERB PuppetHostie constructor() recover$().subscribe() next(2941)
06:47:35 VERB PuppetHostie stop()
06:47:35 SILL StateSwitch <PuppetHostie> off() is pending
06:47:35 WARN PuppetHostie stop() is called on a OFF puppet. await ready(off) and return.
06:47:35 VERB StateSwitch <PuppetHostie> ready(off, false)
06:48:35 SILL Puppet constructor() this.on(reset) payload: "{"data":"RxJS recover$"}"
06:48:35 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "RxJS recover$"
06:48:35 VERB Puppet reset(RxJS recover$)
06:48:35 VERB PuppetHostie constructor() recover$().subscribe() next(2942)
06:48:35 VERB PuppetHostie stop()
06:48:35 SILL StateSwitch <PuppetHostie> off() is pending
06:48:35 WARN PuppetHostie stop() is called on a OFF puppet. await ready(off) and return.
06:48:35 VERB StateSwitch <PuppetHostie> ready(off, false)
06:49:35 SILL Puppet constructor() this.on(reset) payload: "{"data":"RxJS recover$"}"
06:49:35 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "RxJS recover$"
06:49:35 VERB Puppet reset(RxJS recover$)
06:49:35 VERB PuppetHostie constructor() recover$().subscribe() next(2943)
06:49:35 VERB PuppetHostie stop()
06:49:35 SILL StateSwitch <PuppetHostie> off() is pending
06:49:35 WARN PuppetHostie stop() is called on a OFF puppet. await ready(off) and return.
06:49:35 VERB StateSwitch <PuppetHostie> ready(off, false)
06:50:35 SILL Puppet constructor() this.on(reset) payload: "{"data":"RxJS recover$"}"
06:50:35 SILL Puppet constructor() resetThrottleQueue.subscribe() reason: "RxJS recover$"
06:50:35 VERB Puppet reset(RxJS recover$)
06:50:35 VERB PuppetHostie constructor() recover$().subscribe() next(2944)
06:50:35 VERB PuppetHostie stop()
06:50:35 SILL StateSwitch <PuppetHostie> off() is pending
06:50:35 WARN PuppetHostie stop() is called on a OFF puppet. await ready(off) and return.
06:50:35 VERB StateSwitch <PuppetHostie> ready(off, false)