apache / openwhisk

Apache OpenWhisk is an open source serverless cloud platform
https://openwhisk.apache.org/
Apache License 2.0
6.54k stars 1.17k forks source link

Watcher does not handle an error. #5307

Open style95 opened 2 years ago

style95 commented 2 years ago

Environment details:

Steps to reproduce the issue:

The watcher service uses watchAllKeys method to initialize the watcher.

  private val watcher = etcdClient.watchAllKeys { res: WatchUpdate =>
    res.getEvents.asScala.foreach { event =>
      event.getType match {
        case EventType.DELETE =>
          val key = ByteStringToString(event.getPrevKv.getKey)
          val value = ByteStringToString(event.getPrevKv.getValue)
          val watchEvent = WatchEndpointRemoved(key, key, value, false)
          deleteWatchers
            .foreach { watcher =>
              if (watcher._1.watchKey == key) {
                watcher._2 ! watchEvent
              }
            }
          prefixDeleteWatchers
            .foreach { watcher =>
              if (key.startsWith(watcher._1.watchKey)) {
                watcher._2 ! WatchEndpointRemoved(watcher._1.watchKey, key, value, true)
              }
            }
        case EventType.PUT =>
          val key = ByteStringToString(event.getKv.getKey)
          val value = ByteStringToString(event.getKv.getValue)
          val watchEvent = WatchEndpointInserted(key, key, value, false)
          putWatchers
            .foreach { watcher =>
              if (watcher._1.watchKey == key) {
                watcher._2 ! watchEvent
              }
            }
          prefixPutWatchers
            .foreach { watcher =>
              if (key.startsWith(watcher._1.watchKey)) {
                watcher._2 ! WatchEndpointInserted(watcher._1.watchKey, key, value, true)
              }
            }
        case msg =>
          logging.debug(this, s"watch event received: $msg.")
      }
    }

  }

The watchAllKeys method has an error handler in the argument, but no error handler is provided.

  def watchAllKeys(next: WatchUpdate => Unit = (_: WatchUpdate) => {},
                   error: Throwable => Unit = (_: Throwable) => {},
                   completed: () => Unit = () => {}): Watch = {
    client.getKvClient
      .watch(KvClient.ALL_KEYS)
      .prevKv()
      .executor(threadpool)
      .start(new StreamObserver[WatchUpdate]() {
        override def onNext(value: WatchUpdate): Unit = {
          next(value)
        }

        override def onError(t: Throwable): Unit = {
          error(t)
        }

        override def onCompleted(): Unit = {
          completed()
        }
      })
  }

So the default error handler((_: Throwable) => {}) is being used and it does nothing.

Provide the expected results and outputs:

The error handler should print an error logs and reconnect to ETCD in case of any connection error to restore watcher.

This might be related to https://github.com/apache/openwhisk/issues/5286

bdoyle0182 commented 2 years ago

cool, I am still having some problems from the invoker / scheduler with etcd connection and re-establishing it seems. Though the issues are less impactful right now.