theangryangel / logstash-output-jdbc

JDBC output for Logstash
MIT License
256 stars 101 forks source link

Dynamically creation of insert statements #157

Closed philippbck closed 5 years ago

philippbck commented 5 years ago

Hello,

I want to use logstash to pump data from a mysql database to an oracle database with logstash (v 7.1). I found this awesome plugin here to do that. (Thanks, it´s awesome!) It works perfectly with predefined insert statements, but I want to have the opportunity to create dynamically my insert statements for the output database in the filter { } part.

Down below my config:

input {
  jdbc {
    jdbc_driver_library => "/data/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://{{ db_host }}:3306/{{ db_name }}?autoReconnect=true&useSSL=false"
    jdbc_user => "{{ db_user }}"
    jdbc_password => "{{ db_password }}"
    schedule => "* * * * *"
    statement => "
      SELECT
        `Thing`.`id` AS `id`,
        `Thing`.`lifecycle` as `lifecycle`,
        `Type`.`nameUri` as `thing_type`,
        `Thing`.`createdAt` as `created_at`,
        `Thing`.`updatedAt` as `updated_at`
      FROM
        `Things` AS `Thing`
      LEFT JOIN
        `Types` AS `Type` ON `Thing`.`typeId` = `Type`.`id`
      WHERE
        `Thing`.`updatedAt` > :sql_last_value;
    "
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
    type => "thing_metadata"
    last_run_metadata_path => "/etc/logstash/conf.d/lastrun/.logstash_jdbc_sync_thing_metadata"
  }
}

filter {
    if [type] == "thing_metadata" {
        mutate {
            add_field => { "table_name" => "Table1" }
            add_field => { "table_values" => "ID, TYPE, CREATED_AT, UPDATED_AT" }
            add_field => { "table_columns_values" => "?, ?, ?, ?" }
            add_field => { "table_columns_values_attributes" => '"id", "thing_type", "created_at", "updated_at"' }
        }
    } else {
        mutate {
            add_field => { "table_name" => "Table2" }
            add_field => { "table_columns" => "ID, TYPE" }
            add_field => { "table_columns_values" => "?, ?" }
            add_field => { "table_columns_values_attributes" => "id", "thing_type" }
        }
    }
}

output {
    jdbc {
        driver_class => "oracle.jdbc.driver.OracleDriver"
        driver_jar_path => "/data/ojdbc8-full/ojdbc8.jar"
        connection_string => "jdbc:oracle:thin:@{{ output_db_name }}:1521/{{ output_db_service_name }}"
        username => "{{ output_db_user }}"
        password => "{{ output_db_password }}"
        statement => [ "INSERT INTO %{table_name} (%{table_columns}) VALUES(%{table_columns_values})", table_columns_values_attributes ]
        unsafe_statement => true
    }
    stdout { 
        codec => rubydebug 
    }
}

Currently I have problems accessing the value of table_columns_values_attributes in the output statement. Any idea what I need to do? When I fill in the attributes there like "id", "thing_type", "created_at", "updated_at" it works perfectly.

Thanks for help!

theangryangel commented 5 years ago

The syntax you're using for your statement line probably isn't going to work as expected for 2 reasons

You might be better off moving your if statement from the filter, to the output and having multiple jdbc outputs. I'll admit, it's not ideal, but its the only way I can think of doing this as-is.

it would look something like this (this one has the added benefit of no longer requiring unsafe_statement);

output {
  if [type] == "thing_metadata" {
    jdbc {
      driver_class => "oracle.jdbc.driver.OracleDriver"
        driver_jar_path => "/data/ojdbc8-full/ojdbc8.jar"
        connection_string => "jdbc:oracle:thin:@{{ output_db_name }}:1521/{{ output_db_service_name }}"
        username => "{{ output_db_user }}"
        password => "{{ output_db_password }}"
        statement => [ "INSERT INTO Table1 (ID, TYPE, CREATED_AT, UPDATED_AT) VALUES(?, ?, ?, ?)", "id", "thing_type", "created_at", "updated_at" ]
    }
  }
  else {
    jdbc {
      driver_class => "oracle.jdbc.driver.OracleDriver"
        driver_jar_path => "/data/ojdbc8-full/ojdbc8.jar"
        connection_string => "jdbc:oracle:thin:@{{ output_db_name }}:1521/{{ output_db_service_name }}"
        username => "{{ output_db_user }}"
        password => "{{ output_db_password }}"
        statement => [ "INSERT INTO Table2 (ID, TYPE) VALUES(?, ?)", "id", "thing_type" ]
    }
  }

  stdout { 
    codec => rubydebug 
  }
}

The only other way I can think of doing this would be to add an extra configuration option (lets say dynamic_statement) and allowing the entire statement field to reference an event field. Frankly it's unlikely I'll get around to doing this anytime soon, plus you'll still need the if statement somewhere anyway so I'm a little reticent to add it when you can use the above form instead.

Let me know if that's no good for some reason.

philippbck commented 5 years ago

Thank you very much for the quick and detailed response. To have more than one output jdbc can be an option. I need to play around with that solution. I´ll give you feedback of my results.

philippbck commented 5 years ago

More than one one outputs are a good solution! But the only thing I need in addition is to pass fields from filter { } to output { } like:

filter {
   mutate {
       # "%{id}" & "%{thing_type}" from Input Statement
       add_field => { "id" => "%{id}" }
       add_field => { "thing_type" => "%{thing_type}" }
   }
}

output {
  if [type] == "thing_metadata" {
    jdbc {
      driver_class => "oracle.jdbc.driver.OracleDriver"
        driver_jar_path => "/data/ojdbc8-full/ojdbc8.jar"
        connection_string => "jdbc:oracle:thin:@{{ output_db_name }}:1521/{{ output_db_service_name }}"
        username => "{{ output_db_user }}"
        password => "{{ output_db_password }}"
        statement => [ "INSERT INTO Table1 (ID, TYPE) VALUES(?, ?)", "%{id}", "%{thing_type}" ]
    }
  }
}

Because I want to do some stuff with the data in the filter { } part. The config above doesn´t work. Logstash is writing "%{id}" and "%{thing_type}" in the database instead of the real values of the fields. Is it possible in general to pass field values from filter { } to output { }? If yes, what is my mistake? Thanks!

theangryangel commented 5 years ago

It should work.

Can you double check check using stdout output that they're in the event? As long as they're there (it's all passed straight through the event.sprintf function if you use that syntax - the output plugin doesnt care if they're added in a filter or from an input, it literally has no knowledge of it).

On Wed, Jul 3, 2019 at 3:55 PM Philipp Buck notifications@github.com wrote:

More than one one outputs are a good solution! But the only thing I need in addition is to pass fields from filter { } to output { } like:

filter {

mutate {

   add_field => { "id" => "%{id}" }

   add_field => { "thing_type" => "%{thing_type}" }

}

}

output {

if [type] == "thing_metadata" {

jdbc {

  driver_class => "oracle.jdbc.driver.OracleDriver"

    driver_jar_path => "/data/ojdbc8-full/ojdbc8.jar"

    connection_string => "jdbc:oracle:thin:@{{ output_db_name }}:1521/{{ output_db_service_name }}"

    username => "{{ output_db_user }}"

    password => "{{ output_db_password }}"

    statement => [ "INSERT INTO Table1 (ID, TYPE) VALUES(?, ?)", "%{id}", "%{thing_type}" ]

}

}

}

The config above doesn´t work. Logstash is writing "%{id}" and "%{thing_type}" in the database instead of the real values of the fields. Is it possible in general to pass field values from filter { } to output { }? If yes, what is my mistake? Thanks!

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/theangryangel/logstash-output-jdbc/issues/157?email_source=notifications&email_token=AACLVT3UK5M4JKA3D6PWRATP5S4XFA5CNFSM4H4376ZKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZEW4TI#issuecomment-508128845, or mute the thread https://github.com/notifications/unsubscribe-auth/AACLVT3K7SDJ7FGOZ74QVULP5S4XFANCNFSM4H4376ZA .

philippbck commented 5 years ago

I had a little typo in my config. Now it works for me. And the option with multiple jdbc outputs are a good solution for me. Thanks! You can close the issue.

theangryangel commented 5 years ago

No worries philip :+1: