GoogleCloudPlatform / spring-cloud-gcp

New home for Spring Cloud GCP development starting with version 2.0.
Apache License 2.0
424 stars 315 forks source link

Spring cloud GCP, library abruptly stops receiving messages. #3316

Open ksachdev1 opened 3 weeks ago

ksachdev1 commented 3 weeks ago

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.

We'd love to accept your patches and contributions to this project. There are just a few small guidelines you need to follow before opening an issue or a PR:

  1. Ensure the issue was not already reported.
  2. Open a new issue if you are unable to find an existing issue addressing your problem. Make sure to include a title and clear description, as much relevant information as possible, and a code sample or an executable test case demonstrating the expected behavior that is not occurring.
  3. Discuss the priority and potential solutions with the maintainers in the issue. The maintainers would review the issue and add a label "Accepting Contributions" once the issue is ready for accepting contributions.
  4. Open a PR only if the issue is labeled with "Accepting Contributions", ensure the PR description clearly describes the problem and solution. Note that an open PR without an issue labeled with " Accepting Contributions" will not be accepted.

See also CONTRIBUTING.md .

Describe the bug I am using PubSubReactiveFactory route to stream messages. They sometimes work for few minutes to an hour, and after sometime, nothing gets picked up. If I restart the K8 pod running the code, it starts working again and the behavior repeats.

Sample Code

@Configuration
@Slf4j
public class PubSubConsumer {
    @Autowired
    private final PubSubReactiveFactory pubSubReactiveFactory;

    private final MyConfig myConfig;

    private final MySubscriberService mySubscriberService;

    @Autowired
    private final ObjectMapper objectMapper;

    private Disposable disposable;

    @Autowired
    public PubSubConsumer(PubSubReactiveFactory pubSubReactiveFactory, MyConfig myConfig, mySubscriberService mySubscriberService, ObjectMapper objectMapper) {
        log.info("PubSubConsumer constructor");
        this.pubSubReactiveFactory = pubSubReactiveFactory;
        this.myConfig = myConfig;
        this.mySubscriberService = mySubscriberService;
        this.objectMapper = objectMapper;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void start() {
        log.info("Staring the My-Event-Listener");
        this.disposable = this.pubSubReactiveFactory.poll(myConfig.getMySubscription(),1000)
                .flatMap(this::myMessageHandler)
                .subscribeOn(Schedulers.parallel())
                .subscribe();

    }

    public Mono<Void> myMessageHandler(AcknowledgeablePubsubMessage message) {
        log.debug("myMessageHandler is called.");
        PubsubMessage m = message.getPubsubMessage();
        try {
            MyMessage myMessage = objectMapper
                    .readValue(m.getData().toByteArray(), MyMessage.class);
            mySubscriberService.myMessageReceiver(myMessage);
        } catch (Exception e) {
            log.error("Could not finish the action. {}",e.getMessage());
        } finally {
            return Mono.fromFuture(message.ack().toCompletableFuture());
        }
    }

    @EventListener(ContextClosedEvent.class)
    public void stop() {
        log.info("Stopping My-Event-Listener");
        if(this.disposable!=null && !this.disposable.isDisposed()) {
            this.disposable.dispose();
        }
    }
mpeddada1 commented 1 week ago

Thanks for filing this issue @ksachdev1! Please consider submitting a support ticket to https://cloud.google.com/support/ to help with debugging this issue further. This will ensure your situation gets attention and may result in guidance on how to achieve your end goal.