sczyh30 / vertx-blueprint-microservice

Vert.x Blueprint Project - Micro-Shop microservice application
Apache License 2.0
770 stars 301 forks source link

Rxjava jdbc #4

Closed YuiFx closed 7 years ago

YuiFx commented 7 years ago

数据链接在官方的demo中,需要关闭链接,这里的演习程序没有关闭。是不需要吗?

如果使用传统的方式比如JDBI需要包装一层线程池运行吗

sczyh30 commented 7 years ago

在实际业务场景中需要手动关闭数据库连接以便资源能够及时被释放。Rx风格下可以这么做:

    return client.getConnectionObservable()
      .flatMap(conn -> {
        return conn.queryWithParamsObservable(STREAM_STATEMENT, params)
          .map(ResultSet::getRows)
          .flatMapIterable(item -> item) // list merge into observable
          .map(this::wrapCartEvent)
          .doOnNext(i -> conn.close());
      });

之后我会更新对应的示例 :-)

如果用传统的阻塞型JDBC的话需要包装线程池运行。可以通过 executeBlocking 方法包装阻塞过程。其实Vert.x JDBC底层并不是真正的异步,底层其实就是用Worker线程池包装运行的JDBC,因此无需自己包装。

YuiFx commented 7 years ago

.doOnNext(i -> conn.close());这个如果出现异常会不执行的 用doOnTerminate可以确保运行,但是需要用一个东西去保持connection 官方demo中,

jdbc.getConnectionObservable().subscribe(
        conn -> {

          // Now chain some statements using flatmap composition
          Observable<ResultSet> resa = conn.updateObservable("CREATE TABLE test(col VARCHAR(20))").
              flatMap(result -> conn.updateObservable("INSERT INTO test (col) VALUES ('val1')")).
              flatMap(result -> conn.updateObservable("INSERT INTO test (col) VALUES ('val2')")).
              flatMap(result -> conn.queryObservable("SELECT * FROM test"));

          // Subscribe to the final result
          resa.subscribe(resultSet -> {
            System.out.println("Results : " + resultSet.getRows());
          }, err -> {
            System.out.println("Database problem");
            err.printStackTrace();
          }, conn::close);//在这里关闭连接
        },

        // Could not connect
        err -> {
          err.printStackTrace();
        }

这个有点不太习惯,所以才想通过别的方式去确保数据链接的安全

sczyh30 commented 7 years ago

官方的写法不太reactive。 把doOnNext替换成doOnTerminate就好了:

    return client.getConnectionObservable()
      .flatMap(conn ->
        conn.queryWithParamsObservable(STREAM_STATEMENT, params)
          .map(ResultSet::getRows)
          .flatMapIterable(item -> item) // list merge into observable
          .map(this::wrapCartEvent)
          .doOnTerminate(conn::close)
      );

其中conn可以保持,只不过要嵌套一层flatMap

YuiFx commented 7 years ago

OK,THS