redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.14k stars 838 forks source link

gcp_cloud_storage(input) lack of randomize Objects List #1679

Open AlmogAlmado opened 1 year ago

AlmogAlmado commented 1 year ago

Hi,

I have a pipeline that reads from a GCS bucket using the "gcp_cloud_storage" component and deletes the file after it has been acknowledged. This works well when using one thread/broker/pod, but when multiple threads are used, a race condition issue arises causing only one "worker" to successfully complete the operation while the others trigger an error. "level=error msg="Failed to read message: storage: object doesn't exist"

The root cause of the issue is the Go package "cloud.google.com/go/storage" which returns objects in a lexicographical order. When multiple workers are used, they both do the listing and only one of them can successfully perform the operation and delete the file while the other receives an error that the file is missing.

I have implemented a cache in my pipeline to prevent duplication, but the input reads the file before it and triggers the error.

Possible solutions:

1)Use a workaround 2)Add a Randomize feature for gcp_cloud_storage component: By adding a preliminary rand.Shuffle for the pending objects list, every "worker" would have a different order. I am (more than) willing to create a pull request for this solution, if this is the right way

My pipeline is as follows:

input: read_until: restart_input: true check: false # tried use errored() to recreate the pod - no go input: gcp_cloud_storage: bucket: "$INPUT_Bucket" prefix: "$INPUT_Bucket_Prefix" codec: all-bytes delete_objects: true

pipeline:
  processors:
  - bloblang: |
      meta file_name = meta("gcs_key").filepath_split().slice(-1).join("")
  - cache:
      resource: main_cache_redis
      operator: add
      key: '${!  meta("file_name") }' 
      value: '${!   meta("gcs_last_modified_unix") }'
  - mapping: root = if errored() { deleted() }

Thanks, Almog

mhite commented 1 year ago

Interesting, I'm not familiar with what the expected behavior is when enabling multiple threads (or how the workers are initialized, ie. is it a shared initialized input, or are multiple inputs initialized with separate internal state, etc.). Would love to understand that more.

AlmogAlmado commented 1 year ago

Tx @mhite Matt, The bigger problem for me is that I use multiple pods, so even if we'll solve the multiple threads issue we are back to square one. I think that randomization/shuffle is the best approach here, but I would like to verify it. @Jeffail Would you mind sharing your point of view also?

mhite commented 1 year ago

@AlmogAlmado - Thanks for the clarification.

Depending on your use case, you may find this PR useful. I think multiple pods sharing the same subscription distributes the work. Obviously the use case is more narrow.

https://github.com/benthosdev/benthos/pull/1658

mhite commented 1 year ago

You might also try segmenting by prefix.

AlmogAlmado commented 1 year ago

Tx @mhite really appreciate your assistance. I'll go for a PR of the randomization feature, segmenting by prefix can be a solution for multiple threads but for multiple pods it will require also some cache which I wish to avoid.

Jeffail commented 1 year ago

Hey @AlmogAlmado, randomisation is definitely one way to do it but it's a bit too whimsy for me to feel comfortable recommending it as a horizontal scaling strategy.

One thing we could consider is adding a mapping field for the object names that allows you to do whatever you want to those target objects, including deletion. In your case that'd allow you to do something like object_mapping: 'root = if random_int(max: 10) == 0 { this } else { deleted() }'

This way we're not explicitly recommending randomised reading, and we're also adding a generally useful feature for everyone else.

AlmogAlmado commented 1 year ago

Thank you, @Jeffail , for this elegant solution that addresses most cases. However, there is a potential issue where if only one pod is up, files may be falsely skipped, and a restart(read_until) will occur. This process may repeat, potentially wasting a small amount of time and not being 100% efficient due to the repeated listing process. While dynamic configuration and adjusting the config according to pod count could be considered, I would prefer to avoid it if possible.

I had initially considered randomization as a quick fix to achieve the goal, despite its limitations and shortcomings. In my opinion, the ideal solution for my specific case would be to register with the cache before actually reading the object, but I am uncertain if this is a quick and simple solution.

Please share your thoughts on how we should proceed moving forward