janmg / logstash-input-azure_blob_storage

This is a plugin for Logstash to fetch files from Azure Storage Accounts
Other
29 stars 8 forks source link

Logstash not able to read and process .csv file #40

Open vitthaltambe-gep opened 1 year ago

vitthaltambe-gep commented 1 year ago

I am reading .csv file from the Azure storage blob and adding that data in Elasticsearch. But logstash gets stuck after the below lines.

[2023-06-16T18:30:28,479][INFO ][logstash.inputs.azureblobstorage][main][181f6119611506bdf4dacc3ea01b35a1dee26ed795d3b801c7e8f3d7080a0e8c] learn json one of the attempts failed [2023-06-16T18:30:28,479][INFO ][logstash.inputs.azureblobstorage][main][181f6119611506bdf4dacc3ea01b35a1dee26ed795d3b801c7e8f3d7080a0e8c] head will be: {"records":[ and tail is set to ]}

`below is the pipeline code:

input
{
      azure_blob_storage {
        storageaccount => "xxx"
        access_key => "yyy"
        container => "zzz"
        #path_filters => ['**/invoices_2023.03.04_20.csv']
    }
}
filter {
mutate {
            gsub => ["message", "\"\"", ""]
       }

  csv {
      separator => ","
  columns => ["uniqueId","vendorId","vendorNumber","vendorName","reference","documentDate","purchaseOrderNumber","currency","amount","dueDate","documentStatus","invoiceBlocked","paymentReference","clearingDate","paymentMethod","sourceSystem","documentType","companyCode","companyCodeName","companyCodeCountry","purchaseOrderItem","deliveryNoteNumber","fiscalYear","sapInvoiceNumber","invoicePendingWith"]
  }
if [uniqueId] == "Unique Key"
{
drop { }
}
if [uniqueId] =~ /^\s*$/ {
  drop { }
}
   mutate { remove_field => "message" }
   mutate { remove_field => "@version" }
   mutate { remove_field => "host" }
   mutate { add_field => { "docTypeCode" => "1019" }}
  # mutate { remove_field => "@timestamp" }
}
output {
        elasticsearch {
            hosts => "aaa:9243"
            user => "bbb"
            password => "ccc@1234"
            index => "invoicehub_test"
            document_type  => "_doc"
            action => "index"
            document_id => "%{uniqueId}"
        }
       # stdout { codec => rubydebug { metadata => true} }
}

CSV file contains data: Unique Key,Vendor Id,Vendor Number # VN,Vendor Name,Reference,Document Date,Purchase order number,Currency,Amount,Due Date,Document Status,Invoice blocked,Payment Reference Number,Payment/Clearing Date,Payment Method,Source System,Document Type,Company Code,Company Name,Company code country,Purchase order item,Delivery note number,FiscalYear,SAP Invoice Number,Invoice Pending with (email id) AMP_000013530327,50148526,50148526,CARTUS RELOCATION CANADA LTD,2000073927CA,10/21/2019,,CAD,2041.85,11/21/2019,Pending Payment,,,,,AMP,Invoice,1552,Imperial Oil-DS Br,CA,,,2019,2019, AMP_000013562803,783053,783053,CPS COMUNICACIONES SA,A001800009476,11/1/2019,,ARS,1103.52,12/1/2019,Pending Payment,,,,,AMP,Invoice,2399,ExxonMobil B.S.C Arg. SRL,AR,,,2019,2019, AMP_000013562789,50115024,50115026,FARMERS ALLOY FABRICATING INC,7667,11/5/2019,4410760848,USD,-38940.48,12/5/2019,In Progress,,,,,AMP,Credit Note,944,EM Ref&Mktg (Div),US,,,0,0,wyman.w.hardison@exxonmobil.com

janmg commented 1 year ago

Can you set the codec for the input to 'line'. The default is JSON and that requires the plugin to learn the start and stop bytes, but for CSV it's not relevant. This plugin doesn't take into account that the first line is a header, I'm planning a new release soon to fix some other issues and I can add some line skipping logic, but probably you can filter the header in the filter stage

vitthaltambe-gep commented 1 year ago

How can I set the codec for the input to 'line'? Can you please help me with that?

janmg commented 1 year ago

input { azure_blob_storage { codec => "csv"

The codec is a standard logstash parameter, it defines how the content of an event. By default I set it to JSON. But you can set it to line or csv

https://www.elastic.co/guide/en/logstash/current/plugins-codecs-csv.html

vitthaltambe-gep commented 1 year ago

I added the plugin below and modified the pipeline, but still not working for me.

bin/logstash-plugin install logstash-codec-csv

I tried both line and csv in the codec.

The pipeline is stuck after the below lines.

[2023-06-19T11:23:16,911][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} [2023-06-19T11:23:16,982][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2023-06-19T11:23:20,134][INFO ][logstash.inputs.azureblobstorage][main][3f2c68435ba035ec87614e80723b5194a8d29c6ead5ddbcddd54f218d46ed8cd] resuming from remote registry data/registry.dat

`input { azure_blob_storage { storageaccount => "exxonpaymentstatusqc" access_key => "QL6XD4T3L5mE3EeirQxRe79SV7Y1k3ezF5jKlZHkzBODoaGTT7W0X67BUhAR7ZrxqIZiYuvgBRPFBxUXNF812Q==" container => "qcexxonmobildatastore" codec => "line"

path_filters => ['**/invoices_2023.03.04_20.csv']

}

} filter { mutate { gsub => ["message", "\"\"", ""] }

csv { separator => "," columns => ["uniqueId","vendorId","vendorNumber","vendorName","reference","documentDate","purchaseOrderNumber","currency","amount","dueDate","documentStatus","invoiceBlocked","paymentReference","clearingDate","paymentMethod","sourceSystem","documentType","companyCode","companyCodeName","companyCodeCountry","purchaseOrderItem","deliveryNoteNumber","fiscalYear","sapInvoiceNumber","invoicePendingWith"] } if [uniqueId] == "Unique Key" { drop { } } if [uniqueId] =~ /^\s*$/ { drop { } } mutate { remove_field => "message" } mutate { remove_field => "@version" } mutate { remove_field => "host" } mutate { add_field => { "docTypeCode" => "1019" }}

mutate { remove_field => "@timestamp" }

} output { elasticsearch { hosts => "https://7a9dd14c791d4615a146a805072c908b.eastus2.azure.elastic-cloud.com:9243" user => "qcappowner" password => "Password@1234" index => "invoicehub_urtexxonmobil_test"

document_type => "_doc"

        action => "index"
        document_id => "%{uniqueId}"
    }
   stdout { codec => rubydebug { metadata => true} }

} `

janmg commented 1 year ago

Make sure to change the passwords and access_keys. You can set registry_create_policy => "start_over" or delete the data/registry.dat because previous files have already been seen and will not be processed. Only new files or grown files will be processed by default. To output more when starting the plugin you can set debug_until => 1000 where the plugin will show more details for the first 1000 events/lines I processed.

vitthaltambe-gep commented 1 year ago

The pipeline is not able to read headers in the CSV file and add headers as values (check screenshot). Also processing only one records that is headers.

Do you have any sample pipeline to which I can refer to read data from CSV file and add it to the elasticsearch.

image

`input { azure_blob_storage { storageaccount => "exxonpaymentstatusqc" access_key => "QL6XD4T3L5mE3EeirQxRe79SV7Y1k3ezF5jKlZHkzBODoaGTT7W0X67BUhAR7ZrxqIZiYuvgBRPFBxUXNF812Q==" container => "qcexxonmobildatastore" codec => "csv" skip_learning => true registry_create_policy => "start_over" path_filters => ['*/.csv'] } } filter { mutate { gsub => ["message", "\"\"", ""] }

csv { separator => "," columns => ["uniqueId","Vendor Id","Source","Vendor Name"] } if [uniqueId] == "Unique Key" { drop { } } if [uniqueId] =~ /^\s*$/ { drop { } } mutate { remove_field => "message" } mutate { remove_field => "@version" } mutate { remove_field => "host" } mutate { add_field => { "docTypeCode" => "1019" }}

mutate { remove_field => "@timestamp" }

} output { elasticsearch { hosts => "https://7a9dd14c791d4615a146a805072c908b.eastus2.azure.elastic-cloud.com:9243" user => "qcappowner" password => "Password@1234" index => "invoicehub_urtexxonmobil_test"

document_type => "_doc"

        action => "index"
        document_id => "%{uniqueId}"
    }
   stdout { codec => rubydebug { metadata => true} }

} CSV Data: uniqueId,Vendor Id,Source,Vendor Name AMP_000013562789,50115024,50115026,FARMERS ALLOY FABRICATING INC `