keypup-io / cloudtasker

Background jobs for Ruby using Google Cloud Tasks
MIT License
153 stars 38 forks source link

Protobuf serialization errors: ArgumentError (Value 600 must be a Hash or a Google::Protobuf::Duration) #94

Closed rhavyn closed 1 year ago

rhavyn commented 1 year ago

Hello,

I'm trying to get cloudtasker 0.13 working as an ActiveJob adapter and I'm getting an error when I enqueue a job: ArgumentError (Value 600 must be a Hash or a Google::Protobuf::Duration). I'm not sure if I'm doing something wrong or if I'm running into legitimate bugs. I've read through the docs and don't see any obvious mistakes. I was able to recreate this from a brand new Rails 7.0.5 project, the only additional gem is cloudtasker. I created a simple job:

class EchoJob < ApplicationJob
  queue_as :default

  def perform(message)
    Rails.logger.warn "The message is: #{message}"
  end
end

which is queued like this:

class EchoController < ApplicationController
  def index
    EchoJob.perform_later("Hello, World!")
  end
end

I set the queue adapter in config/development.rb config.active_job.queue_adapter = :cloudtasker. And the cloudtasker initializer is:

Cloudtasker.configure do |config|
  config.gcp_location_id = 'us-central1'
  config.gcp_project_id = 'XXXXX'
  config.gcp_queue_prefix = 'YYYYY'
  config.processor_host = 'ZZZZZ.ngrok-free.app/'
  config.mode = :production
end

The first issue is I'm getting an error that reads: [ActiveJob] Failed enqueuing EchoJob to Cloudtasker(default): ArgumentError (Value 600 must be a Hash or a Google::Protobuf::Duration). The stack trace shows the error coming from lib/cloudtasker/backend/google_cloud_task_v2.rb:146 which passes off the serialized job to the Google Cloud Tasks gem which throws the error. The value 600 is coming from lib/cloudtasker/worker_handler.rb:162 where dispatch_deadline is being set to to 600. I was able to validate that commenting out that line causes that error to go away:

def task_payload
      {
        http_request: {
          http_method: 'POST',
          url: Cloudtasker.config.processor_url,
          headers: {
            Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json',
            Cloudtasker::Config::AUTHORIZATION_HEADER => Cloudtasker.config.oidc ? nil : Authenticator.bearer_token
          }.compact,
          oidc_token: Cloudtasker.config.oidc,
          body: worker_payload.to_json
        }.compact,
        # dispatch_deadline: worker.dispatch_deadline.to_i,
        queue: worker.job_queue
      }
    end

That led me to a second protobuf ArgumentError, this one caused by the schedule_time option being set to nil in lib/cloudtasker/backend/google_cloud_task_v2.rb:104. I changed the last line of that method (line 113) to payload.compact which fixed it:

      def self.format_task_payload(payload)
        payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup

        # Format schedule time to Google Protobuf timestamp
        payload[:schedule_time] = format_schedule_time(payload[:schedule_time])

        # Encode job content to support UTF-8.
        # Google Cloud Task expect content to be ASCII-8BIT compatible (binary)
        payload[:http_request][:headers] ||= {}
        payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json'
        payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64'
        payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body])

        payload.compact
      end

After making those two changes enqueuing the job was working perfectly. Again, I'm not sure if I have a config error or if these are actually bugs, looking for some expertise to point me in the right direction.

Thanks!

alachaum commented 1 year ago

Hey @rhavyn - thanks for the detailed report. I'm going to investigate this issue in more details. I'm surprised that a payload.compact fixes the issue. Could it be that the :body entry is nil? This could indicate a change of API in ActiveJob.

I'll do some testing when I have time to check what's happening.

rhavyn commented 1 year ago

Good morning @alachaum

In case it helps, the payload I'm seeing from the released 0.13 gem for my example above looks like:

{
  :http_request=>
    {
      :http_method=>"POST", 
      :url=>"https://zzzz.ngrok-free.app/cloudtasker/run", 
      :headers=>
        {
          :"Content-Type"=>"application/json", 
          :Authorization=>"Bearer eyJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2ODU0NTgyMTF9.BsAiBFaXkSw3P168RG_RiNuSimjq86KfBq8V2uzipXo", 
          "Content-Type"=>"text/json", 
          "Content-Transfer-Encoding"=>"Base64"
        }, 
        :body=>"eyJ3b3JrZXIiOiJBY3RpdmVKb2I6OlF1ZXVlQWRhcHRlcnM6OkNsb3VkdGFz\na2VyQWRhcHRlcjo6Sm9iV3JhcHBlciIsImpvYl9xdWV1ZSI6ImRlZmF1bHQi\nLCJqb2JfaWQiOiIyMDE4OTU5NS00NWY5LTQ2NTktOWQ4Mi04NTQ2YzVkNTU1\nMTYiLCJqb2JfbWV0YSI6e30sImpvYl9hcmdzIjpbeyJqb2JfY2xhc3MiOiJF\nY2hvSm9iIiwiYXJndW1lbnRzIjpbIkhlbGxvLCBXb3JsZCEiXSwiZXhjZXB0\naW9uX2V4ZWN1dGlvbnMiOnt9LCJsb2NhbGUiOiJlbiIsInRpbWV6b25lIjoi\nVVRDIiwiZW5xdWV1ZWRfYXQiOiIyMDIzLTA1LTMwVDE0OjUwOjExWiJ9XX0=\n"
    }, 
  :dispatch_deadline=>600, 
  :queue=>"default", 
  :schedule_time=>nil
}

That hash causes ArgumentError (Value 600 must be a Hash or a Google::Protobuf::Duration):. Removing the :dispatch_deadline key/value results in ArgumentError (Value must be a Hash or a Google::Protobuf::Timestamp):. Compacting the hash to remove the nil :schedule_time fixes the second error.

lovitt commented 1 year ago

I'm seeing the exact same issue on a new Rails app when invoking deliver_later on an ActiveJob.

alachaum commented 1 year ago

As a temporary workaround, while I get a fix done, you may want to try downgrading the version of google-cloud-tasks or google-protobuf

lovitt commented 1 year ago

For now I've implemented @rhavyn's workaround via the following monkey patch:

# Temp fix for https://github.com/keypup-io/cloudtasker/issues/94
#
# Remove this patch once the fix has been made in the cloudtasker gem.

require "cloudtasker/backend/google_cloud_task_v2"

module CloudTaskerWorkerHandlerPatch

  def task_payload
    super.tap do |hsh|
      hsh.delete(:dispatch_deadline)
    end
  end

end

module CloudTaskerGoogleCloudTaskV2Patch

  def format_task_payload(payload)
    super(payload).compact
  end

end

Cloudtasker::WorkerHandler.prepend(CloudTaskerWorkerHandlerPatch)
Cloudtasker::Backend::GoogleCloudTaskV2.singleton_class.send(:prepend, CloudTaskerGoogleCloudTaskV2Patch)

Thank you for investigating, @alachaum!

alachaum commented 1 year ago

I have committed a fix for this issue on 0.13-stable. You can try it by using the following in your Gemfile:

gem 'cloudtasker', github: 'keypup-io/cloudtasker', branch: '0.13-stable'

I'm going to test that version in a production-like environment for a few days to ensure there is no unexpected surprise then publish version 0.13.1

alachaum commented 1 year ago

Cloudtasker v0.13.1 has been pushed to resolve this issue (and another issue with batch jobs) 🎉