krickert / search_indexer

RAG search engine based on wikipedia
GNU General Public License v3.0
3 stars 1 forks source link

Having trouble getting dynamic service clients from Micronaut #10

Closed krickert closed 10 months ago

krickert commented 11 months ago

Problem statement

I'm trying to create a pipeline processor. The idea is this: If I were given a gRPC interface - I can create a gRPC server with that interface. Then I'd register this into Consul. After that registration happens, it would be available in the pipeline for document processing.

Below is an architecture document that describes this workflow.

image Key point: All gRPC services use the same interface but each gRPC service box above represents a different implementation of the same interface.

Take away: 1:1 for gRPC services registered in Consul.

I suspect that this is entirely possible to accomplish but I'm not sure how we can expose this functionality in Micronaut due to it's implementation of the automated consul discovery - it think the only way to do this is by allocating the annotation below.

What seems to work now: I can exposes this sort of functionality by turning on auto discovery and adding this code to get that client:

@Singleton
@Bean
GreeterGrpc.GreeterStub greeterStub(
    @GrpcChannel("greeter")
    ManagedChannel channel) {
    return GreeterGrpc.newStub(
            channel
    );
}

I've tested the code above. It works fine. But I am trying to find a way to use the same code as above but instead of mentioning the service through the annotation, I'd rather pass a string into a service that matches the gRPC service registration name in consul.

The above code doesn't work for me because anytime a data scientist adds a new service to consul, then a code change has to happen in order to support that gRPC service.

What I'm coding now - I'd like to configure the discovery client so I can get the GrpcChannel's Managed Channel just by inputting Consul's service name that's registered.

Because the channel only seems to be specified by the @GrpcChannel annotation, I can't figure out how to accomplish this without using that annotation.

References

https://github.com/micronaut-projects/micronaut-core/discussions/9961

Posted this to the above link as well, but this post is more detailed.

Other thoughts

From debugging the micronaut code, I can see that at one point the NettyChannelBuilder is called with only the service name for consul in it. The name in the annotation is successfully looked up in consul.

io.micronaut.grpc.channels.GrpcChannelBuilderFactory#managedChannelBuilder is called after. This inputs a 0-size interceptor list and the service name we gave in the annotation.

After this, a new NameResolver is created. This is what seems to configure consul.
image

I'm add more notes as I go along...

krickert commented 10 months ago

Still working through this - I found that there simply isn't an implementation to dynamically get service stubs created in Micronaut. In fact, I found the exception that says it's a no-no:

    /**
     * Builds a managed channel for the given target.
     *
     * @param injectionPoint The injection point
     *
     * @return The channel
     */
    @Bean
    @Primary
    protected ManagedChannel managedChannel(InjectionPoint<Channel> injectionPoint) {
        Argument<?> argument;
        if (injectionPoint instanceof FieldInjectionPoint) {
            argument = ((FieldInjectionPoint<?, ?>) injectionPoint).asArgument();
        } else if (injectionPoint instanceof ArgumentInjectionPoint) {
            argument = ((ArgumentInjectionPoint<?, ?>) injectionPoint).getArgument();
        } else {
            throw new ConfigurationException("Cannot directly create channels use @Inject or constructor injection instead");
        }

Specifically - ConfigurationException("Cannot directly create channels use @Inject or constructor injection instead")

So I went ahead and found this post: https://mykidong.medium.com/howto-grpc-java-client-side-load-balancing-using-consul-8f729668d3f8

Which points to this project: https://github.com/mykidong/grpc-java-load-balancer-using-consul

And the code does exactly what it's supposed to do - but it's 4 years old. So I updated all the libraries - and that allowed me to be free of any known CVEs at runtime. However, there is an issue with one part of that code:

image

This code is deprecated. So for now it works, but I'll have to figure out how to update the code to properly work with this grpc.

The thing I love about this client implementation is that it chooses the server by doing a consul lookup. This is exactly what I was looking for - which allows me to scale the services dynamically.

I did make initial test code and it did exactly what I wanted it to do:

package com.krickert.search.grpc;

import com.krickert.search.grpc.component.grpc.ConsulNameResolver;
import com.krickert.search.service.PipeServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.micronaut.context.annotation.*;
import jakarta.inject.Inject;

import java.util.Collections;

@Factory
public class ManagedChannelFactory {

    private final String defaultZone;

    @Bean
    @Prototype
    PipeServiceGrpc.PipeServiceBlockingStub getStub() {
        String serviceName = "nlp";
        String consulAddr = "consul://" + defaultZone;

        int pauseInSeconds = 5;

        ManagedChannel channel = ManagedChannelBuilder
                .forTarget(consulAddr)
                .defaultLoadBalancingPolicy("round_robin")
                .nameResolverFactory(
                        new ConsulNameResolver.ConsulNameResolverProvider(
                                serviceName,
                                pauseInSeconds,
                                false,
                                Collections.emptyList()))
                .usePlaintext()
                .build();

        return PipeServiceGrpc.newBlockingStub(channel);
    }

   @Inject
    public ManagedChannelFactory(@Property(name = "consul.client.defaultZone") String defaultZone) {
        this.defaultZone = defaultZone;
    }
}

Before I merge this code, I have to do two things: 1) Get proper permission to merge it into my project. There's no license info on the project so I emailed the author. 2) find a way to not use the deprecated function without losing the functionality.

ANY hints to do this directly in micronaut with it's consul integration would be appreciated. For now i might just forgo any consul integration for the client part and only use it for the service registration.

krickert commented 10 months ago

I was able to fix this by ripping out the code that makes the Managed Channel:

package com.krickert.search.grpc;

import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.type.Argument;
import io.micronaut.grpc.channels.GrpcNamedManagedChannelConfiguration;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.streams.state.RocksDBConfigSetter.LOG;

@Singleton
@Requires(notEnv = Environment.TEST)
public class ConsulGrpcManagedChannelFactory implements AutoCloseable {

    private final ApplicationContext beanContext;
    private final Map<ChannelKey, ManagedChannel> channels = new ConcurrentHashMap<>();

    @Inject
    public ConsulGrpcManagedChannelFactory(ApplicationContext beanContext) {
        this.beanContext = beanContext;
    }

    public ManagedChannel managedChannelFromConsul(String serviceName) {
        Argument<String> argument = Argument.STRING;
        return channels.computeIfAbsent(new ChannelKey(argument, serviceName), channelKey -> {
            final NettyChannelBuilder nettyChannelBuilder = beanContext.createBean(NettyChannelBuilder.class, serviceName);
            ManagedChannel channel = nettyChannelBuilder.build();
            beanContext.findBean(GrpcNamedManagedChannelConfiguration.class, Qualifiers.byName(serviceName))
                    .ifPresent(channelConfig -> {
                        if (channelConfig.isConnectOnStartup()) {
                            LOG.debug("Connecting to the channel: {}", serviceName);
                            if (!connectOnStartup(channel, channelConfig.getConnectionTimeout())) {
                                throw new IllegalStateException("Unable to connect to the channel: " + serviceName);
                            }
                            LOG.debug("Successfully connected to the channel: {}", serviceName);
                        }
                    });
            return channel;
        });
    }

    /**
     * Client key.
     */
    private static final class ChannelKey {

        final Argument<?> identifier;
        final String value;

        public ChannelKey(Argument<?> identifier, String value) {
            this.identifier = identifier;
            this.value = value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            ChannelKey clientKey = (ChannelKey) o;
            return Objects.equals(identifier, clientKey.identifier) &&
                    Objects.equals(value, clientKey.value);
        }
    }
    private boolean connectOnStartup(ManagedChannel channel, Duration timeout) {
        channel.getState(true); // request connection
        final CountDownLatch readyLatch = new CountDownLatch(1);
        waitForReady(channel, readyLatch);
        try {
            return readyLatch.await(timeout.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private void waitForReady(ManagedChannel channel, CountDownLatch readyLatch) {
        final ConnectivityState state = channel.getState(false);
        if (state == ConnectivityState.READY) {
            readyLatch.countDown();
        } else {
            channel.notifyWhenStateChanged(state, () -> waitForReady(channel, readyLatch));
        }
    }

    @Override
    @PreDestroy
    public void close() {
        for (ManagedChannel channel : channels.values()) {
            if (!channel.isShutdown()) {
                try {
                    channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Error shutting down GRPC channel: {}", e.getMessage(), e);
                    }
                }
            }
        }
        channels.clear();
    }

}

This works fine - just need to give it the service name that's registered in consul and it will return the managed channel for that service.

https://github.com/micronaut-projects/micronaut-grpc/blob/master/grpc-client-runtime/src/main/java/io/micronaut/grpc/channels/GrpcManagedChannelFactory.java

It's just a copy of the code above that's slightly modified. So you could just make this the lower function in that class and it might return the same functionality and support both? I opened the feature request to hope this makes it in a future version because it'll allow me to reliably create channels dynamically so the front end for this can be totally customized.

krickert commented 10 months ago

Closing with fix above