benthosdev / benthos

Fancy stream processing made operationally mundane
https://www.benthos.dev
MIT License
7.68k stars 752 forks source link

GCP cloud storage output error with Failed to delete temporary file used for merging: context canceled #2522

Open anthonycorbacho opened 1 month ago

anthonycorbacho commented 1 month ago

Context

I am reading bunch of event coming from a MQTT broker

input:
  label: "benthos_mqtt_test"
  mqtt:
    urls:
      - localhost:1883
    client_id: benthos-reader
    connect_timeout: 30s
    topics:
      - $share/benthos/XXXX/#

and I am trying to process and save the output into blobstorage (GCP cloud storage). The processing is relatively simple, I am taking an mqtt message (slice of events) and outputting to a CSV like format

buffer:
  none: {}
pipeline:
  threads: -1
  processors: 
    - unarchive:
        format: "json_map"
    - bloblang: |
        let csv = match {
             this.value.type() == "bool" => ",,"+this.value.string()
             this.value.type() == "number" => this.value.string()+",,"
             _ =>  ","+this.value.string()+","
        }
        root = this.ts.ts_strftime("%Y-%m-%dT%H:%M:%S%z").string() + "," + meta("archive_key") + "," + $csv +"\n"
        meta dt = this.ts.ts_strftime("%Y-%m-%d", "UTC")
        meta id = meta("archive_key").escape_url_query()

the output logic is to create folder partition by date (dt=YYYY/MM/DD) and id (id=ID)

output:
  label: "blobstorage"
  #stdout: {}
  gcp_cloud_storage:
    bucket: "abucket" 
    path: benthos/dt=${!@dt}/id=${!@id}/file.csv
    collision_mode: append
    timeout: 60s
    max_in_flight: 64

when running Benthos with the config, I am seeing quite often this error

ERRO Failed to delete temporary file used for merging: context canceled  @service=benthos label=blobstorage path=root.output

I am not 100% sure if this is false positive since I have quite a lot of event comming per sec (~1000 events) and the error doesn't tell me much about the path timing out so I cannot really check if the temp file has been merged or not. i tried to change timeout 20s, 30s 60s but seems to always error.

mihaitodor commented 1 month ago

Hey @anthonycorbacho 👋 thank you for raising this issue! While I think setting timeout: 60s should be more than enough time for the Google APIs to delete a file, what might be happening is either that the write fails and we try to remove the file anyway or that writes take too long and they don't leave enough time in the same context to also do the deletion part (but I think the latter is unlikely).

Not sure what's the best way to address this, but I'd consider skipping the deletion if the write fails. Also, it might be worth either reducing the level of that "Failed to delete temporary file used for merging" log or introducing a flag which users can enable to silence this error.

Additionally, I think there's some opportunity for performance gains by passing more than one object in the ComposerFrom() call. The docs say that up to 32 are supported, so that could be a good future enhancement.

PS: PRs are welcome!

anthonycorbacho commented 1 month ago

Hi @mihaitodor thank you for the answer.

I tested different settings and am still having issues. I cloned the repo and added more debug to see the folder where the merge failed. The funny thing is that the file that was supposed to have a bunch of entries inside one has 1.

I tried with batch and having multiple file instead and same issue happen

  gcp_cloud_storage:
     bucket: "abucket"
     path: benthos/dt=${!@dt}/id=${!@id}/file-${!timestamp_unix_nano()}.csv
     content_type: binary/octet-stream
     collision_mode: append
     timeout: 60s
     max_in_flight: 64
     batching:
       count: 100

Maybe my pipeline is causing the issue because each event I get generally has a bunch of IDs in the message. e.g

{
  "ID1": {ts: xxx, value: xxx},
  "ID2": {ts: xxx, value: xxx},
  "ID3": {ts: xxx, value: xxx},
  "ID4": {ts: xxx, value: xxx},
}

and I am trying to

I tired to put every messages into a single file but I get a rate limit error.

mihaitodor commented 1 month ago

I'm not sure what's going on there, but if you do path: benthos/dt=${!@dt}/id=${!@id}/file-${!timestamp_unix_nano()}.csv, then that forces the output to create a new file for every write, ignoring the batching, because ${!timestamp_unix_nano()} will resolve to a unique path for each message. One way around it is to add an archive processor in the batching section with format: lines and then you'll get a unique file per batch if the generated path is always different. If not, collision_mode: append should result in an append.

PS: When configuring batching with a count you likely also want to add a period, since your input will not always fill the last batch and then Benthos will keep waiting and waiting for the batch to fill up before emitting it.

anthonycorbacho commented 4 weeks ago

@mihaitodor i followed your recommendation with the batching and the good new is that I don't get the error anymore, so I think this help.

But this bring a new issue, now with the following gcp output setting

batching:
       period: 1s
       processors:
        - archive:
            format: lines

each folder (dt=YYYY/MM/DD/id=XXXX) contain a mix of ids not the one I am targeting with the path.

so if the id in the path is id=ID1 I am expecting to have only id1 in the file but its a mix of all the ids (id1, id2, etc)

mihaitodor commented 4 weeks ago

@anthonycorbacho That's great to hear! Regarding the path, it sounds like you don't want to mix IDs across multiple files, so just have a file for each ID value if I'm understanding you correctly. If that's what you need, then you can break up the batch into smaller batches by using the group_by_value processor before the archive one to create batches for each individual ID. However, if the IDs are unique / message, then that will result in batches of size one, which is probably not what you want.

anthonycorbacho commented 4 weeks ago

@mihaitodor

it sounds like you don't want to mix IDs across multiple files

Its mote like I don't want to mix id accros multiple path, one folder = one id more likely.

if the IDs are unique / message, then that will result in batches of size one

Ids are sensor id, so multiple reading will happen within couple of seconds.

anthonycorbacho commented 3 weeks ago

@mihaitodor I tried with your last recommended way and still getting the context error, and now it looks like I am loosing data

  gcp_cloud_storage:
    bucket: "abucket"
     path: benthos/dt=${!@dt}/file-${!@id}.csv
     content_type: application/octet-stream
     collision_mode: append
     timeout: 60s
     max_in_flight: 64
     batching:
       period: 10s
       processors:
        - group_by_value:
            value: ${! meta("id") }
        - archive:
            format: lines

I have a mix of

Failed to delete temporary file used for merging: context deadline exceeded

and

Failed to delete temporary file used for merging: context canceled
mihaitodor commented 3 weeks ago

I have a feeling that max_in_flight: 64 is overwhelming the GCP API. I'd start by reducing that to 1 and then increase it gradually until performance plateaus.

and now it looks like I am loosing data

That is surprising. Benthos should keep retrying messages until the output reports success. Would be great to know if that is a reproducible issue somehow, since it should be fixed.

anthonycorbacho commented 3 weeks ago

@mihaitodor I have tried with max_in_flight: 1 and I get the same error Failed to delete temporary file used for merging: context canceled after a while and my files seems to not be updated anymore, it look like it work one time and then nothing.

I think its easily reproducible, I think if you have a lot of messages (~9k every sec) you can easily hit this issue, I don't have a complex use case, I just transform a map of json into multiple messages, change the formatting into csv and then try to save it into gcp cloud storage

mihaitodor commented 3 weeks ago

Understood. I don't have the capacity now to look into this in more detail unfortunately and no GCP access to run tests. I think the hints I left in my previous comment should point in the right direction if anyone is interested in submitting a PR.

anthonycorbacho commented 3 weeks ago

@mihaitodor thank you for your help! I really appreciate the time you spent on my issue.

Maybe I should try to use AWS S3 storage instead of gcs if it fix my issue.

I will try to play more and see if I find a way to fix it.