vert-x / mod-lang-scala

Vert.x 2.x is deprecated - use instead
https://github.com/vert-x3/vertx-lang-scala
Apache License 2.0
77 stars 29 forks source link

NullPointerException in DefaultEventBus #156

Open angeloh opened 10 years ago

angeloh commented 10 years ago

I have a handler which accepts client's connection and broadcasts his connected status to another eventbus address for its subscribers. This is how I test this handler.

It seems this error was created in the convertHandler. However, I didn't use sendWithTimeout or replyWithTimeout in my code.

  @Test
  def testSockJSVerticleConnect() {
    import org.vertx.scala.core.FunctionConverters._
    container.deployVerticle("scala:verticles.SockJSVerticle", Json.obj(), 1, {
      case Success(deploymentId) => {

          lazy val hdl = (msg: Message[JsonObject]) => {
              assertEquals("connected", msg.body.getString("status"))
              testComplete()
          }
          vertx.eventBus.registerHandler("eb.pres.memberid1", hdl) // test if memberid1 connected

          vertx.createHttpClient.setHost("localhost").setPort(8081).connectWebsocket("/ws/websocket",
             (w: WebSocket) => {
            val msg = Json.obj("body" -> Json.obj("action" -> "connect", "memberId" -> "memberid1"),
             "type" -> "send", "address" -> "eb.conn")
            w.writeTextFrame(msg.encode()) // memberid1 connects
          })

      }
      case Failure(ex) => fail()
    }: Try[String] => Unit)
  }

But I always see this exception showing up even the test case runs fine.

    SEVERE: Scala verticle threw exception
    java.lang.NullPointerException
        at org.vertx.java.core.eventbus.impl.DefaultEventBus$6.handle(DefaultEventBus.java:755)
        at org.vertx.java.core.eventbus.impl.DefaultEventBus$6.handle(DefaultEventBus.java:745)
        at org.vertx.java.core.eventbus.impl.DefaultEventBus$11.run(DefaultEventBus.java:943)
        at org.vertx.java.core.impl.DefaultContext$3.run(DefaultContext.java:175)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:724)
angeloh commented 10 years ago

I'm guessing somewhere mod-scala forgets to create handler? Its weird to have a NPE without any clues. Any suggestions?

angeloh commented 10 years ago

This is receive and connect action.

override def receive = (msg: Message[JsonObject]) => {
    case "connect" => connect(msg.body)
}

protected def connect(json: JsonObject) = {
    Option(json.getString("memberId")) match {
      case None => Error("member id parameter missing!")
      case Some(memberId) => {
        AsyncReply {
          val p = Promise[BusModReply]
          val token = XUUID.noDashStr
          membersMap.put(token, Member(memberId))
          eb.send(redisAddress, Json.obj("command" -> "hset", "args" -> Json.arr(s"pres-${memberId}", "status", "connected")), // redis hset hash
             (msg: Message[JsonObject]) => {
              eb.publish(s"eb.pres.${memberId}", Json.obj("status"->"connected")) // publish to clients who sub 'eb.pres.memberId'
              p.success(Ok(Json.obj("status"->"ok", "token"->token)))
          })
          val timerId = createTimer(token)
          logger.info(s">>> client connects, set new timer: $timerId")
          timeoutMap.put(token, timerId)
          p.future
        } // -- AsyncReply
      }
    }
  }
angeloh commented 10 years ago

Ok. I found the issue. At EventBusBridge.checkAndSend, it checks "replyAddress" and creates a replyHandler and later send to sendWithTimeout(). In my case, I am using http client's connectWebsocket. I need to pass in "replyAddress" to avoid this NPE. But I wonder why assign replyHandler to null at checkAndSend() if this will definitely cause NPE later. I think this should be fixed.

private void checkAndSend(boolean send, final String address, Object body,
                            final SockJSSocket sock,
                            final String replyAddress) {
    final SockInfo info = sockInfos.get(sock);
    if (replyAddress != null && !checkMaxHandlers(info)) {
      return;
    }
    final Handler<AsyncResult<Message<Object>>> replyHandler;
    if (replyAddress != null) {
      replyHandler = new Handler<AsyncResult<Message<Object>>>() {
        public void handle(AsyncResult<Message<Object>> result) {
          .......
        }
      };
    } else {
      replyHandler = null;
    }
    ......
 }
galderz commented 10 years ago

I haven't debugged this yet, but it appears that it's an issue in Vert.x core itself? https://github.com/eclipse/vert.x/pull/810/files