davidmoten / rxjava2-jdbc

RxJava2 integration with JDBC including Non-blocking Connection Pools
Apache License 2.0
392 stars 42 forks source link

Feature Request: hook to get ResultSet before calling next #42

Open ahtokca opened 5 years ago

ahtokca commented 5 years ago

First things first - thanks for the amazing state of the art software.

I use it to stream large volume of data. It will be awesome to have a hook/callback called at the moment when ResultSet is just obtained. In particular I map ResultSet to CSV. It is all fine if result is not empty however I'd like return just a header (column names) when there the result is empty something like below.

public <T> Flowable<T> get(BiFunction<ResultSet, T> onResultSetIsReady, BiFunction<ResultSet, T> mapper) 

Or may be extends ResultSetMapper?

And use it something like below

sb.select(sql).get(::toHeaderLine, ::toCsvLine): Flowable<String>

Thanks one more time for the amazing software

davidmoten commented 5 years ago

Thanks @ahtokca, glad it's useful.

I think the functionality you are looking for sits better outside the library. The thing to remember is that nothing stops you mapping the ResultSet to itself in the get method (example below). The method is there to encourage users to use the ResultSet as soon as possible so it is not exposed to out of order processing (because the same ResultSet object is returned for every row).

Flowable<String> lines = 
  Flowable.defer(() -> {
     boolean[] isFirst = new boolean[] {true};
     return sb.select(sql)
        .get(rs -> rs)  
        .map(rs -> {
            String s = (isFirst[0] ? toHeaderLine(rs)  + "\n":"") +  toCsv(rs));
            isFirst[0] = false;
            return s;
        });
  });

The defer creation method is required to make the Flowable reusable (isFirst state per subscription).

ahtokca commented 5 years ago

Hi @davidmoten, thanks for the quick answer. Indeed it is how I used it however for a corner case when query is empty it produce not header. So I come up with overriding Select

private static <T> Flowable<T> create(Connection con, String sql, Flowable<List<Object>> parameterGroups, int fetchSize,
                                          Function<? super ResultSet, T> mapper, boolean eagerDispose,
                                          Function<ResultSetMetaData, T> headerMapper) {
        log.debug("Select.create called with con={}", con);
        Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql);
        Function<NamedPreparedStatement, Flowable<T>> observableFactory = (ps) -> parameterGroups.flatMap((parameters) -> {
            Flowable<T> rows = create(ps.ps, parameters, mapper, ps.names, sql, fetchSize);
            if (headerMapper != null) {
                Flowable<T> header = Flowable.fromCallable(()->headerMapper.apply(ps.ps.getMetaData()));
                rows = Flowable.concat(header, rows);
            }
            return rows;
        }, true, 1);
        Consumer<NamedPreparedStatement> disposer = Util::closePreparedStatementAndConnection;
        return Flowable.using(initialState, observableFactory, disposer, eagerDispose);
    }
davidmoten commented 5 years ago

@ahtokca Good point. I hadn't considered the empty ResultSet case. Some API addition might be good.