googleapis / java-pubsub

Apache License 2.0
130 stars 90 forks source link

GCP Pub/Sub Publisher Issue: ManagedChannel Not Shut Down Properly #1803

Closed velkrish89 closed 11 months ago

velkrish89 commented 1 year ago

I am encountering an issue with the GCP Pub/Sub Publisher in our application. When publishing a high volume of events (greater than 20 events per second), the ManagedChannel instances are not being shut down properly, leading to resource leaks. This issue is impacting our application's stability.

Environment

GCP Pub/Sub Library Version: 4.7.1 Operating System: Windows 11 Java Version: 17.0.6 GRPC Version: 1.56.1

image

Steps to reproduce

  1. Configure the GCP Pub/Sub Publisher to publish a high volume of events (greater than 20 events per second).
  2. Initiate the publishing of events.
  3. Observe the log output and resource management.

Code example

Configurations

  public CredentialsProvider credentialsProvider() {
    return () ->
        ServiceAccountCredentials.fromStream(Files.newInputStream(Paths.get(credentialsFilePath)))
            .createScoped(PublisherStubSettings.getDefaultServiceScopes());
  }

  @Bean
  public PubSubPublisherTemplate pubSubPublisherTemplate(
      CredentialsProvider credentialsProvider, PubSubMessageConverter pubSubMessageConverter) {

    DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> projectId);
    factory.setEnableMessageOrdering(true);
    factory.setCredentialsProvider(credentialsProvider);
    factory.setEndpoint("us-east1-pubsub.googleapis.com:443");

    PubSubPublisherTemplate template = new PubSubPublisherTemplate(factory);
    template.setMessageConverter(pubSubMessageConverter);
    return template;
  }

  @Bean
  public PubSubSubscriberTemplate subscriberTemplate(
      CredentialsProvider credentialsProvider, PubSubMessageConverter pubSubMessageConverter) {

    GcpPubSubProperties gcpPubSubProperties = new GcpPubSubProperties();
    gcpPubSubProperties.setKeepAliveIntervalMinutes(1);
    gcpPubSubProperties.initialize(projectId);

    DefaultSubscriberFactory factory =
        new DefaultSubscriberFactory(() -> projectId, gcpPubSubProperties);
    factory.setCredentialsProvider(credentialsProvider);

    PubSubSubscriberTemplate template = new PubSubSubscriberTemplate(factory);
    template.setMessageConverter(pubSubMessageConverter);
    return template;
  }

  @Bean
  public PubSubTemplate pubSubTemplateBean(
      PubSubPublisherTemplate pubSubPublisherTemplate,
      PubSubSubscriberTemplate subscriberTemplate) {
    return new PubSubTemplate(pubSubPublisherTemplate, subscriberTemplate);
  }

Publisher

    items.forEach(
        item -> {
          log.info(
              "Publishing item (without ordering key) : [{}] with event description : [{}] ",
              item.getItemNumber(),
              item.getItemDescription());
          pubSubTemplate.publish(topic(), item);
        });
  }

Dependencies

  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-dependencies</artifactId>
      <version>4.7.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>spring-cloud-gcp-starter</artifactId>
</dependency>

Stack trace

{"@timestamp":"2023-10-30T07:08:27.855Z","ecs.version":"1.2.0","log.level":"ERROR","message":"*~*~*~ Previous channel ManagedChannelImpl{logId=233, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*\r\n    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.","process.thread.name":"http-nio-8080-exec-2","log.logger":"io.grpc.internal.ManagedChannelOrphanWrapper","IP":"10.171.32.15","REQUEST_ID":"null","SERVICE_NAME":"Data Service","spanId":"57190b683e5cc8b0","traceId":"255f951e60a9ffa4c9252c0b27c947b9","error.type":"java.lang.RuntimeException","error.message":"ManagedChannel allocation site","error.stack_trace":[{"class":"io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference","method":"<init>","file":"ManagedChannelOrphanWrapper.java","line":102},{"class":"io.grpc.internal.ManagedChannelOrphanWrapper","method":"<init>","file":"ManagedChannelOrphanWrapper.java","line":60},{"class":"io.grpc.internal.ManagedChannelOrphanWrapper","method":"<init>","file":"ManagedChannelOrphanWrapper.java","line":51},{"class":"io.grpc.internal.ManagedChannelImplBuilder","method":"build","file":"ManagedChannelImplBuilder.java","line":631},{"class":"io.grpc.internal.AbstractManagedChannelImplBuilder","method":"build","file":"AbstractManagedChannelImplBuilder.java","line":297},{"class":"com.google.api.gax.grpc.InstantiatingGrpcChannelProvider","method":"createSingleChannel","file":"InstantiatingGrpcChannelProvider.java","line":391},{"class":"com.google.api.gax.grpc.ChannelPool","method":"<init>","file":"ChannelPool.java","line":107},{"class":"com.google.api.gax.grpc.ChannelPool","method":"create","file":"ChannelPool.java","line":85},{"class":"com.google.api.gax.grpc.InstantiatingGrpcChannelProvider","method":"createChannel","file":"InstantiatingGrpcChannelProvider.java","line":237},{"class":"com.google.api.gax.grpc.InstantiatingGrpcChannelProvider","method":"getTransportChannel","file":"InstantiatingGrpcChannelProvider.java","line":231},{"class":"com.google.api.gax.rpc.ClientContext","method":"create","file":"ClientContext.java","line":236},{"class":"com.google.cloud.pubsub.v1.stub.GrpcPublisherStub","method":"create","file":"GrpcPublisherStub.java","line":203},{"class":"com.google.cloud.pubsub.v1.Publisher","method":"<init>","file":"Publisher.java","line":201},{"class":"com.google.cloud.pubsub.v1.Publisher","method":"<init>","file":"Publisher.java","line":91},{"class":"com.google.cloud.pubsub.v1.Publisher$Builder","method":"build","file":"Publisher.java","line":881},{"class":"com.google.cloud.spring.pubsub.support.DefaultPublisherFactory","method":"createPublisher","file":"DefaultPublisherFactory.java","line":186},{"class":"com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate","method":"publish","file":"PubSubPublisherTemplate.java","line":94},{"class":"com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate","method":"publish","file":"PubSubPublisherTemplate.java","line":80},{"class":"com.google.cloud.spring.pubsub.core.PubSubTemplate","method":"publish","file":"PubSubTemplate.java","line":110},{"class":"com.amway.commerce.imsdataservice.app.producer.gcppubsub.GcpPubSubProducer","method":"publishPayload","file":"GcpPubSubProducer.java","line":60},{"class":"com.amway.commerce.imsdataservice.app.handler.impl.WmMaoSyncSupplyEventHandler","method":"lambda$handleItemLocationConfig$5","file":"WmMaoSyncSupplyEventHandler.java","line":160},{"class":"java.util.stream.ForEachOps$ForEachOp$OfRef","method":"accept","file":"ForEachOps.java","line":183},{"class":"java.util.stream.ReferencePipeline$3$1","method":"accept","file":"ReferencePipeline.java","line":197},{"class":"java.util.ArrayList$ArrayListSpliterator","method":"forEachRemaining","file":"ArrayList.java","line":1625},{"class":"java.util.stream.AbstractPipeline","method":"copyInto","file":"AbstractPipeline.java","line":509},{"class":"java.util.stream.AbstractPipeline","method":"wrapAndCopyInto","file":"AbstractPipeline.java","line":499},{"class":"java.util.stream.ForEachOps$ForEachOp","method":"evaluateSequential","file":"ForEachOps.java","line":150},{"class":"java.util.stream.ForEachOps$ForEachOp$OfRef","method":"evaluateSequential","file":"ForEachOps.java","line":173},{"class":"java.util.stream.AbstractPipeline","method":"evaluate","file":"AbstractPipeline.java","line":234},{"class":"java.util.stream.ReferencePipeline","method":"forEach","file":"ReferencePipeline.java","line":596},{"class":"com.amway.commerce.imsdataservice.app.handler.impl.WmMaoSyncSupplyEventHandler","method":"handleItemLocationConfig","file":"WmMaoSyncSupplyEventHandler.java","line":158},{"class":"com.amway.commerce.imsdataservice.app.handler.impl.WmMaoSyncSupplyEventHandler","method":"lambda$handleMessage$1","file":"WmMaoSyncSupplyEventHandler.java","line":139},{"class":"java.util.concurrent.CompletableFuture$AsyncRun","method":"run","file":"CompletableFuture.java","line":1804},{"class":"java.util.concurrent.CompletableFuture$AsyncRun","method":"exec","file":"CompletableFuture.java","line":1796},{"class":"java.util.concurrent.ForkJoinTask","method":"doExec","file":"ForkJoinTask.java","line":373},{"class":"java.util.concurrent.ForkJoinPool$WorkQueue","method":"topLevelExec","file":"ForkJoinPool.java","line":1182},{"class":"java.util.concurrent.ForkJoinPool","method":"scan","file":"ForkJoinPool.java","line":1655},{"class":"java.util.concurrent.ForkJoinPool","method":"runWorker","file":"ForkJoinPool.java","line":1622},{"class":"java.util.concurrent.ForkJoinWorkerThread","method":"run","file":"ForkJoinWorkerThread.java","line":165}]}
maitrimangal commented 1 year ago

Can you ensure that we have only one publisher per application? And can you share the code where you are shutting down the publisher? Ensure to call awaitTermination() after calling shutdown(). Also, was there any version changes that happened?

maitrimangal commented 11 months ago

Closing due to no response. Please open a new bug with the updated info if the issue still persists.

zll600 commented 6 months ago

Can you ensure that we have only one publisher per application?

I encountered a similar problem with following log. And this way can help me solve it. Thank you.

Apr 16, 2024 6:04:27 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Previous channel ManagedChannelImpl{logId=10, target=pubsub.googleapis.com:443} was garbage collected without being shut down! ~*~*~*
    Make sure to call shutdown()/shutdownNow()
java.lang.RuntimeException: ManagedChannel allocation site
    at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
    at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:662)
    at io.grpc.ForwardingChannelBuilder2.build(ForwardingChannelBuilder2.java:254)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:441)
    at com.google.api.gax.grpc.ChannelPool.<init>(ChannelPool.java:107)
    at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:85)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:243)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:237)
    at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:226)
    at com.google.cloud.pubsub.v1.stub.GrpcPublisherStub.create(GrpcPublisherStub.java:203)
    at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:201)
    at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:91)
    at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:881)