janmg / logstash-input-azure_blob_storage

This is a plugin for Logstash to fetch files from Azure Storage Accounts
Other
29 stars 8 forks source link

Reading JSON append blobs from storage account #36

Open saiframahii opened 1 year ago

saiframahii commented 1 year ago

Hello,

Currently I have a diagnostic setting on my Azure Data Factory that sends pipeline/activity logs to a container in storage account in the following format "container/dir/dir/dir/y=2023/m=05/d=02/h=10/m=00/PT1H.json. For every hour, incoming logs from ADF get appended to one append blob (PT1H.json). So far, the input plugin is working fine when reading historical blobs (not current hour) and is offsetting read blobs in registry file. However, I'm running through an issue when reading the current appended blob. Scenario: its 12:00 and logs are written to storage account > logstash is running and picks up new json file > at 12:05 new logs are appended to the same json file > I get the below error


[INFO ] 2023-05-02 10:48:51.835 [[main]<azure_blob_storage] azureblobstorage - resuming from remote registry data/registry.dat [ERROR] 2023-05-02 10:48:52.947 [[main]<azure_blob_storage] javapipeline - A plugin had an unrecoverable error. Will restart this plugin. Pipeline_id:main Plugin: <LogStash::Inputs::AzureBlobStorage container=>"insights-logs-pipelineruns", codec=><LogStash::Codecs::JSONLines id=>"json_lines_cd27bbac-2203-44c5-9469-8925f1f88948", enable_metric=>true, charset=>"UTF-8", delimiter=>"\n">, interval=>10, id=>"f967be9e17d3af9286ab0875ce6754357103745f10dc7217b8275c3568271f9b", storageaccount=>"saeu2afglogpoc", access_key=>, enable_metric=>true, logtype=>"raw", dns_suffix=>"core.windows.net", registry_path=>"data/registry.dat", registry_create_policy=>"resume", addfilename=>false, addall=>false, debug_until=>0, debug_timer=>false, skip_learning=>false, file_head=>"{\"records\":[", file_tail=>"]}", path_filters=>["*/"]> Error: InvalidBlobType (409): The blob type is invalid for this operation. RequestId:b063a660-f01e-0013-0ee3-7c48cd000000 Time:2023-05-02T10:48:52.8242660Z Exception: Azure::Core::Http::HTTPError Stack: /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/http/http_request.rb:154:in call' org/jruby/RubyMethod.java:116:incall' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/http/signer_filter.rb:28:in call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/http/http_request.rb:111:inblock in with_filter' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/service.rb:36:in call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/filtered_service.rb:34:incall' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/signed_service.rb:41:in call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/storage/common/service/storage_service.rb:60:incall' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-blob-2.0.3/lib/azure/storage/blob/blob_service.rb:179:in call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-blob-2.0.3/lib/azure/storage/blob/block.rb:276:inlist_blob_blocks' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-azure_blob_storage-0.12.7/lib/logstash/inputs/azure_blob_storage.rb:413:in partial_read' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-azure_blob_storage-0.12.7/lib/logstash/inputs/azure_blob_storage.rb:271:inblock in run' org/jruby/RubyHash.java:1519:in each' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-azure_blob_storage-0.12.7/lib/logstash/inputs/azure_blob_storage.rb:246:inrun' /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:414:in inputworker' /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405:inblock in start_input'

I guess what I'm trying to find out in short is if this plugin is able to read from append blobs. Please let me know if you need any further information on this matter. Thanks in advance.

janmg commented 1 year ago

The problem here is the way AppendBlobs grow versus the BlockBlobs. When processing the file for the first time it can read the full file and process it properly. The plugin was originally written with BlockBlobs in mind, those have growing JSON constructs and the parsing needs to take the header and the footer into account.

Here you apparently have appendblobs with json_lines that can grow. This plugin doesn't handle that properly. It's probably not too hard to implement a new logtype that can deal with it, but I currently don't have much time on my hand. This plugin already depends on the azure-storage-ruby and append blobs are supported, so it doesn't require that much effort.

https://github.com/Azure/azure-storage-ruby/blob/master/blob/lib/azure/storage/blob/append.rb

janmg commented 1 year ago

I think the append blobs work the same as the blob blocks, that would mean that you may be able to set the head and tail to an empty string and it may work with json_lines. With a bit of luck I can test it over the weekend and if that's the case I can catch the exception you have and surpress the head and tail learning, so this could ork out of the box. But it's engineering, so hope tends to be quickly replaced by frantic troubleshooting

saiframahii commented 1 year ago

Would that mean it would capture all the data in the append blob every time its appended, or is there a way to continue from last read line? Thank you so much for looking into this, appreciate it 👍

janmg commented 1 year ago

From what I understand from append blobs is that they use the same blocks and if that's the case the plugin should be able to regularly check if the file has grown and read from the offset the new bytes. But before that can work I have to make a workaround to avoid the InvalidBlobType.

janmg commented 1 year ago

I finally pushed 0.12.9 with support for appendblobs. In the config you can set "append => true", but if you don't, the plugin will do it by itself during the invalideBlobType exception. I haven't done extensive testing, so feedback is welcome