smallrye / smallrye-mutiny-vertx-bindings

Smallrye Mutiny bindings for Eclipse Vert.x
https://smallrye.io/smallrye-mutiny-vertx-bindings
Apache License 2.0
83 stars 26 forks source link

Support of Future<ReadStream<T>> #954

Open jponge opened 3 months ago

jponge commented 3 months ago

The generator does not properly support the case of Future<ReadStream<T>> methods (e.g., a Vert.x HTTP client response that is eventually wrapped by a JSON stream parser).

The idea is to translate those to Uni<ReadStream<T>> where the ReadStream is a Mutiny bindings shim, so we can call .toMulti() on it.

Given the following interface:

@VertxGen(concrete = false)
public interface AsyncStreams {

    Future<ReadStream<String>> asyncStream();
}

then the generated code is:

@io.smallrye.mutiny.vertx.MutinyGen(io.vertx.core.streams.AsyncStreams.class)
public interface AsyncStreams {

  static final io.smallrye.mutiny.vertx.TypeArg<io.vertx.mutiny.core.streams.ReadStream<java.lang.String>> TYPE_ARG_0 = new TypeArg<io.vertx.mutiny.core.streams.ReadStream<java.lang.String>>(o1 -> io.vertx.mutiny.core.streams.ReadStream.newInstance((io.vertx.core.streams.ReadStream)o1, TypeArg.unknown()), o1 -> o1.getDelegate());
  io.vertx.core.streams.AsyncStreams getDelegate();

  public io.smallrye.mutiny.Uni<io.vertx.mutiny.core.streams.ReadStream<String>> asyncStream();

  public static  AsyncStreams newInstance(io.vertx.core.streams.AsyncStreams arg) {
    return arg != null ? new AsyncStreamsImpl(arg) : null;
  }

}

class AsyncStreamsImpl implements AsyncStreams {
  private final io.vertx.core.streams.AsyncStreams delegate;

  public io.vertx.core.streams.AsyncStreams getDelegate() {
    return delegate;
  }

  /**
   * Empty constructor used by CDI, do not use this constructor directly.
   **/
  AsyncStreamsImpl() {
    this.delegate = null;
  }

  public AsyncStreamsImpl(io.vertx.core.streams.AsyncStreams delegate) {
    this.delegate = delegate;
  }

  @CheckReturnValue
  public io.smallrye.mutiny.Uni<io.vertx.mutiny.core.streams.ReadStream<String>> asyncStream() { 
    return io.smallrye.mutiny.vertx.UniHelper.toUni(delegate.asyncStream().map(x -> ReadStream<String>.newInstance(x)));}

  public io.vertx.mutiny.core.streams.ReadStream<String> asyncStreamAndAwait() { 
    return asyncStream().await().indefinitely();
  }

  public void asyncStreamAndForget() { 
    asyncStream().subscribe().with(io.smallrye.mutiny.vertx.UniHelper.NOOP);
  }

}

where the following method:

@CheckReturnValue
  public io.smallrye.mutiny.Uni<io.vertx.mutiny.core.streams.ReadStream<String>> asyncStream() { 
    return io.smallrye.mutiny.vertx.UniHelper.toUni(delegate.asyncStream().map(x -> ReadStream<String>.newInstance(x)));}

does not compile:

AsyncStreams.java:[46,115] illegal start of type
AsyncStreams.java:[46,116] not a statement
AsyncStreams.java:[46,117] ';' expected
image
tsegismont commented 3 weeks ago

As a workaround, you can create a Vert.x API object that extends ReadStream, like the Cassandra client does:

  /**
   * Executes the given SQL <code>SELECT</code> statement which returns the results of the query as a read stream.
   *
   * @param sql              the SQL to execute. For example <code>SELECT * FROM table ...</code>.
   * @return a future of the result
   */
  Future<CassandraRowStream> queryStream(String sql);
jponge commented 3 weeks ago

Nice trick