amazon-archives / logstash-input-dynamodb

This input plugin for Logstash scans a specified DynamoDB table and then reads changes to a DynamoDB table from the associated DynamoDB Stream.This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline. This gem is not a stand-alone program.
Apache License 2.0
105 stars 27 forks source link

Logstash-input-dynamoDB #14

Closed bretd25 closed 8 years ago

bretd25 commented 8 years ago

I am having trouble using logstash-input-dynamodb with multiple tables. Here is my configuration file

input { dynamodb { endpoint => "dynamodb.us-west-2.amazonaws.com" streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com" view_type => "new_and_old_images" aws_access_key_id => "myAwsKey" aws_secret_access_key => "MyAwsSecretKey" perform_scan => "false" table_name => "Table1" }
dynamodb { endpoint => "dynamodb.us-west-2.amazonaws.com" streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com" view_type => "new_and_old_images" aws_access_key_id => "SameKeyAsAbove" aws_secret_access_key => "SameKeyAsAbove" perform_scan => "false" table_name => "Table2" } } output { elasticsearch { hosts => ["MyElasticSearchHost"] ssl => "true" } stdout { } }

The problem is only table2 is correctly streaming to Elastic Search and it is not streaming table1. I have changed the order and it is always the last table that works. I looked in some documentation and is says

"You can also configure the plugin to index multiple tables by adding additional dynamodb { } sections to the input section."

Any idea why only the last table is working?

Bret

marcosnils commented 8 years ago

@bretd25 do you need both tables to go to the same elasticsearch index?.

I believe the problem is the output plugin. I've managed to do this but with a slight different setup.

Here's my configuration file:

input {
  dynamodb {
    type => "moderation"
    table_name => "backoffice_mim_moderation"
    perform_scan => true
    view_type => "new_image"
    endpoint => "dynamodb.us-east-1.amazonaws.com"
    streams_endpoint => "streams.dynamodb.us-east-1.amazonaws.com"
    aws_access_key_id => "xxxxxxxxxx"
    aws_secret_access_key => "xxxxxxxxxx"
  }
  dynamodb {
    type => "image"
    table_name => "backoffice_mim_image"
    perform_scan => true
    view_type => "new_image"
    endpoint => "dynamodb.us-east-1.amazonaws.com"
    streams_endpoint => "streams.dynamodb.us-east-1.amazonaws.com"
    aws_access_key_id => "xxxxxxxxx"
    aws_secret_access_key => "xxxxxxxx"
  }
}

filter {
  dynamodb {}
  mutate {
    remove_field => ["message"]
  }
}

output {
  if [type] == "moderation" {
      elasticsearch {
        host => "elasticsearch"
        index => "moderation"
        document_id => "%{image_id}"
        manage_template => true
        template => "/config/templates/moderation.json"
        template_name => "moderation"
      }
  }
  if [type] == "image" {
      elasticsearch {
        host => "elasticsearch"
        index => "image"
        document_id => "%{id}"
        manage_template => true
        template => "/config/templates/image.json"
        template_name => "image"
      }
  }
}

The reason why I did this is because I needed the tables to go to different elasticsearch indexes with different templates. I'm not sure what happens when you configure the output plugin as you did (basically without any configuration), but the configuration I've provided above it definitely works.

Hope it's useful for you

note: the dynamodb-filter plugin is something that we developed ourselves (https://github.com/mantika/logstash-filter-dynamodb) in order to get the documents indexed properly in ES.

bretd25 commented 8 years ago

@marcosnils Thank you very much for your response. I have changed my configuration file to

input {

dynamodb { endpoint => "dynamodb.us-west-2.amazonaws.com" streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com" view_type => "new_image" aws_access_key_id => "XXXXXX" aws_secret_access_key => "XXXXX" perform_scan => "false" type => "person" table_name => "Person" }

dynamodb { endpoint => "dynamodb.us-west-2.amazonaws.com" streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com" view_type => "new_image" aws_access_key_id => "XXXX" aws_secret_access_key => "XXXXX" perform_scan => "false" type => "test" table_name => "Test" }

} output { if [type] == "person" { elasticsearch { hosts => ["XXXX.us-west-2.es.amazonaws.com:443"] ssl => "true" index => "person" document_id => "%person_id" } } if [type] == "test" { elasticsearch { hosts => ["XXXXX.us-west-2.es.amazonaws.com:443"] ssl => "true" index => "test" document_id => "%test_id" } }

 stdout { }

}

If I insert a document into person I do not see the insertion in the console window. If i insert a document into test then the document gets inserted and i can see it in the console window.

marcosnils commented 8 years ago

@bretd25 which logstash version are you running?

bretd25 commented 8 years ago

2.2

marcosnils commented 8 years ago

Hmm that might be the problem. I use 1.5.6...

If you have Docker you can use our image to see if it works:

https://hub.docker.com/r/mantika/logstash-dynamodb-streams/

bretd25 commented 8 years ago

@marcosnils I do not have Docker installed, but I actually used some of your docker configuration to get this far.

marcosnils commented 8 years ago

@bretd25 seems like it might be a compatibility issue with the logstash version as it works fine for me with 1.5.6. Unfortunately I can't provide further assistance as I haven't tried with logstash 2.x

bretd25 commented 8 years ago

@marcosnils In order to get it to work i had to give it a

checkpointer => "logstash_input_dynamodb_cptr_Test"

Is seems like the default checkpointer for both tables was not working.

bretd25 commented 8 years ago

@marcosnils Thanks for all your help. I greatly appreciate it.

Bret

marcosnils commented 8 years ago

@bretd25 interesting. In with logstash 1.5.6 checkpointer tables were created automatically depending on the supplied table name. Weird that it was using the same checkpointer for both tables.

Anyways, amazing that you figured out!.

sidaYewno commented 7 years ago

hi, @marcosnils I use your image, it is working for update and insert, but for remove it is not removing the item from elastic search, is there any additional config I need to setup for that?

marcosnils commented 7 years ago

@sidaYewno this is because the default action of the logstash ES output plugin is index (https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-action) if you want to remove your items as well, you have to define a delete action as well for the ES logstash output plugin.

sidaYewno commented 7 years ago

hi @marcosnils I tried define delete action, but it is deleting the item regardless of the event, even when I am creating or updating, it is deleting the item. i added action=>"delete" in my command,am i doing it wrong? or is there other config I need to add?

marcosnils commented 7 years ago

@sidaYewno have you checked https://github.com/awslabs/logstash-input-dynamodb/issues/12 ?

sidaYewno commented 7 years ago

@marcosnils I tried this input { dynamodb{ endpoint => "dynamodb.us-west-2.amazonaws.com" streams_endpoint => "streams.dynamodb.us-west-2.amazonaws.com" view_type => "new_and_old_images" aws_access_key_id => "access" aws_secret_access_key => "secret" table_name => "companies" } } filter { dynamodb { } } output { if [eventName] == "REMOVE" {
elasticsearch { host => "search-endpoint.es.amazonaws.com" document_id => "%{[keys][company_uuid]}" protocol => "http" port => "80" index => "companies" action => "delete" } stdout { } } else {
elasticsearch { host => "search-endpoint.es.amazonaws.com" document_id => "%{[keys][company_uuid]}" protocol => "http" port => "80" index => "companies" action => "index" } stdout { } } }

it says config not valid anything I am doing wrong?

marcosnils commented 7 years ago

@sidaYewno if you launch logstash with debug enabled it should say why the config is not valid.

sidaYewno commented 7 years ago

error is Expected one of #, input, filter, output at line 37, column 1 (byte 948) after

sidaYewno commented 7 years ago

@marcosnils how do I do that? I run it via your docker image, what command should I use to pass debug flag

marcosnils commented 7 years ago

which docker image are you using? v1 or v2?

sidaYewno commented 7 years ago

@marcosnils I am using docker run mantika/logstash-dynamodb-streams this command, not sure which image I am using, should I use v2? how to do that

sidaYewno commented 7 years ago

https://hub.docker.com/r/mantika/logstash-dynamodb-streams/ this is where i get the command

sidaYewno commented 7 years ago

@marcosnils so I was able to get the config to run, but now it is not outputing anything for any event, here is my config docker run mantika/logstash-dynamodb-streams -e ' input { dynamodb{ endpoint => "..." streams_endpoint => "..." view_type => "..." table_name => "..." perform_scan=>false } } filter { dynamodb {} } output { if [eventName] == "REMOVE"{ elasticsearch { hosts => "..." user => "..." password => "..." index=>"..." document_id=>"..." action=>"delete" } stdout { } }else{ elasticsearch { hosts => "..." user => "..." password => "..." index=>"..." document_id=>"..." action=>"index" } stdout { } } }'

marcosnils commented 7 years ago

@sidaYewno seems like the config is right. I'd suggest setting logstash in debug mode -D flag and check if you see something in the logs.