hashgraph / pbj

A performance optimized Google Protocol Buffers code generator, parser, and Gradle module.
Apache License 2.0
12 stars 6 forks source link

Generate service stubs #255

Open rbair23 opened 1 month ago

rbair23 commented 1 month ago

Problem

PBJ generates Java objects for each message it encounters in the protobuf schema definitions, but it does not generate anything for service definitions. Right now the only way to use PBJ as a library in a web server is to use a very low-level API provided by the gRPC libraries that basically deliver you a byte array from which you can parse. An example of how this can be done can be found in the Hashgraph consensus node repo.

Solution

Create two new files:

/**
 * Defines a common interface for all implementations of a gRPC {@code service}. PBJ will generate a subinterface
 * for each {@code service} in the protobuf schema definition files, with default implementations of each of the
 * given methods in this interface.
 *
 * <p>For example, suppose I have the following protobuf file:
 * <pre>
 * {@code
 * package example;
 *
 * service HelloService {
 *   rpc SayHello (HelloRequest) returns (HelloResponse);
 * }
 *
 * message HelloRequest {
 *   string greeting = 1;
 * }
 *
 * message HelloResponse {
 *   string reply = 1;
 * }
 * }
 * </pre>
 *
 * <p>From this file, PBJ will generate a {@code HelloService} interface that extends {@code ServiceInterface}:
 * <pre>
 * {@code
 * public interface HelloService extends ServiceInterface {
 *    // ...
 *
 *    @NonNull
 *    HelloResponse sayHello(final @NonNull HelloRequest request);
 *
 *    default String serviceName() { return "HelloService"; }
 *    default String fullName() { return "example.HelloService"; }
 *
 *    // ...
 * }
 * </pre>
 *
 * In the application code, you will simply create a new class implementing the {@code HelloService} interface, and
 * register it with your webserver in whatever way is appropriate for your webserver.
 */
public interface ServiceInterface {
    /** Defines a method on the {@code service} IDL definition */
    interface Method {
        /** Gets the name of the method */
        @NonNull
        String name();
    }

    /**
     * Through this interface the {@link ServiceInterface} implementation will send responses back to the client.
     * The {@link #start()} method is called before any responses are sent, and the {@link #close()} method
     * is called after all responses have been sent.
     *
     * <p>It is not common for an application to implement or use this interface. It is typically implemented by
     * a webserver to integrate PBJ into that server.
     */
    interface ResponseCallback {
        /**
         * Called by the {@link ServiceInterface} implementation to before any responses have been sent to the client.
         * This must be called before {@link #send(Bytes)} is called.
         */
        void start();

        /**
         * Called to send a single response message to the client. For unary methods, this will be called once. For
         * server-side streaming or bidi-streaming, this may be called many times.
         *
         * @param response A response message to send to the client.
         */
        void send(@NonNull Bytes response);

        /**
         * Called to close the connection with the client, signaling that no more responses will be sent.
         */
        void close();
    }

    /** Gets the simple name of the service. For example, "HelloService". */
    @NonNull String serviceName();
    /** Gets the full name of the service. For example, "example.HelloService". */
    @NonNull String fullName();
    /** Gets a list of each method in the service. This list may be empty but should never be null. */
    @NonNull List<Method> methods();

    /**
     * Called by the webserver to open a new connection between the client and the service. This method may be called
     * many times concurrently, once per connection. The implementation must therefore be thread-safe. A default
     * implementation is provided by the generated PBJ code, which will handle the dispatching of messages to the
     * appropriate methods in the correct way (unary, server-side streaming, etc.).
     *
     * @param method The method that was called by the client.
     * @param messages A blocking queue of messages sent by the client.
     * @param callback A callback to send responses back to the client.
     */
    void open(
            @NonNull Method method,
            @NonNull BlockingQueue<Bytes> messages,
            @NonNull ResponseCallback callback);
}

Alternatives

We could generate the same types of stubs used by the normal gRPC libraries. The stubs we create would be different from those created by protoc, but they should interoperate with the normal gRPC libraries. This would increase interoperability, because it would just work with any existing servers with gRPC support. And maybe this is something we should consider doing as well.

However, generating our own solution has benefits:

rbair23 commented 1 month ago

Here's an example implementation of ConsensusService. I had to do a little threading in open to support streaming.

public interface ConsensusService extends ServiceInterface {
    enum ConsensusMethod implements Method {
        createTopic,
        updateTopic,
        deleteTopic,
        submitMessage,
        getTopicInfo;
    }

    TransactionResponse createTopic(Transaction tx);
    TransactionResponse updateTopic(Transaction tx);
    TransactionResponse deleteTopic(Transaction tx);
    TransactionResponse submitMessage(Transaction tx);
    Response getTopicInfo(Query q);

    default String serviceName() {
        return "ConsensusService";
    }

    default String fullName() {
        return "proto.ConsensusService";
    }

    default List<Method> methods() {
        return List.of(
                ConsensusMethod.createTopic,
                ConsensusMethod.updateTopic,
                ConsensusMethod.deleteTopic,
                ConsensusMethod.submitMessage,
                ConsensusMethod.getTopicInfo);
    }

    @Override
    default void open(
            final @NonNull Method method,
            final @NonNull BlockingQueue<Bytes> messages,
            final @NonNull ResponseCallback callback) {

        final var m = (ConsensusMethod) method;
        Thread.ofVirtual().start(() -> {
            try {
                switch (m) {
                    case ConsensusMethod.createTopic -> {
                        // Unary method
                        final var message = messages.take();
                        callback.start();
                        final var messageBytes = Transaction.PROTOBUF.parse(message);
                        final var response = createTopic(messageBytes);
                        final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response);
                        callback.send(responseBytes);
                        callback.close();
                    }
                    case ConsensusMethod.updateTopic -> {
                        // Unary method
                        final var message = messages.take();
                        callback.start();
                        final var messageBytes = Transaction.PROTOBUF.parse(message);
                        final var response = updateTopic(messageBytes);
                        final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response);
                        callback.send(responseBytes);
                        callback.close();
                    }
                    case ConsensusMethod.deleteTopic -> {
                        // Unary method
                        final var message = messages.take();
                        callback.start();
                        final var messageBytes = Transaction.PROTOBUF.parse(message);
                        final var response = deleteTopic(messageBytes);
                        final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response);
                        callback.send(responseBytes);
                        callback.close();
                    }
                    case ConsensusMethod.submitMessage -> {
                        // Unary method
                        final var message = messages.take();
                        callback.start();
                        final var messageBytes = Transaction.PROTOBUF.parse(message);
                        final var response = submitMessage(messageBytes);
                        final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response);
                        callback.send(responseBytes);
                        callback.close();
                    }
                    case ConsensusMethod.getTopicInfo -> {
                        // Unary method
                        final var message = messages.take();
                        callback.start();
                        final var messageBytes = Query.PROTOBUF.parse(message);
                        final var response = getTopicInfo(messageBytes);
                        final var responseBytes = Response.PROTOBUF.toBytes(response);
                        callback.send(responseBytes);
                        callback.close();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                callback.close();
            }
        });
    }
}