nats-io / nats-pure.rb

Ruby client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
131 stars 30 forks source link

`is_status_msg` doesn't seem to match any messages #152

Closed Gooseus closed 1 week ago

Gooseus commented 1 week ago

Observed behavior

When doing a fetch on a connected Jetstream pull_consumer pointed at Synadia, the method fails return messages that exist and raises an error:

.../.rvm/gems/ruby-3.3.4@gaga/gems/nats-pure-2.4.0/lib/nats/io/jetstream/pull_subscription.rb:126:in `fetch': OK (status_code=200, err_code=) (NATS::JetStream::Error::APIError)

Which seems like it comes from these areas of the pull_subscription code:

https://github.com/nats-io/nats-pure.rb/blob/0f8ca2f4416f53864e663862c3a5b5def0ac2758/lib/nats/io/jetstream/pull_subscription.rb#L80-L91

^ this check for is_status_msg will only push a msg into msgs if it is NOT a status message and otherwise expects the status to match a non-OK status case or raise an exception containing the Status and Desc headers.

https://github.com/nats-io/nats-pure.rb/blob/0f8ca2f4416f53864e663862c3a5b5def0ac2758/lib/nats/io/jetstream/js.rb#L36-L38

This says that a status message is any non-nil message with a Status header, which passes if the status is 200, but then raises (status_code=200, err_code=) to error out.

Expected behavior

I was expecting that messages being fetched by the pull_consumer with a header.Status field of 200 would be pushed into the msgs list and become available to my application.

Server and client version

Server is NATS Synadia Cloud - tls://connect.ngs.global Ruby 3.3.4 nats-pure-2.4.0

Host environment

Apple M2 macOS Sonoma 14.5 and ubuntu

Steps to reproduce

I set up a Jetstream pull subscription using a durable name and a stream name

  def subscription
    @subscription ||= jetstream.pull_subscribe(nil, name, { stream: })
  end

Then I fetch on that subscription with a fetch_rate (same results with n>=1) and attempt to process each message.

  def fetch
    Rails.logger.debug { "Fetching messages on #{name}." }
    subscription.fetch(fetch_rate).each(&method(:process))
  rescue NATS::IO::Timeout
    Rails.logger.debug { "Timed out fetching messages on #{name}, none to process." }
  end

I never get to the process method because the fetch raises the exception detailed above.

Possible Fix

Going from this:

https://github.com/nats-io/nats-pure.rb/blob/0f8ca2f4416f53864e663862c3a5b5def0ac2758/lib/nats/io/jetstream/js.rb#L36-L38

To this:

def is_status_msg(msg)
    return (!msg.nil? and (!msg.header.nil? and msg.header[Header::Status] and msg.header[Header::Status] != "200"))
end

Seems to work as expected, though I'm pretty new to Ruby and NATS so I'm not 100% confident. I'll submit a PR and hopefully someone can let me know what I'm doing wrong or else help me get this sorted out.

Gooseus commented 1 week ago

Issue turned out to be on my end publishing from the nats.ts with default Status that I did not see.