logstash-plugins / logstash-integration-jdbc

Logstash Integration Plugin for JDBC, including Logstash Input and Filter Plugins
Apache License 2.0
53 stars 54 forks source link

jdbc-input bug: clean_run: true and/or record_last_run: false doesn't work #121

Open s137 opened 1 year ago

s137 commented 1 year ago

Logstash information:

  1. Logstash version: 8.3.3
  2. Logstash installation source: extracted from the official zip archive
  3. How is Logstash being run: windows service or console
  4. How was the Logstash Plugin installed: plugin was not installed separately
  5. Plugin Version: both versions 5.2.4 and 5.3.0 have the issue

JVM (e.g. java -version): Bundled JDK: openjdk version "17.0.4" 2022-07-19 OpenJDK Runtime Environment Temurin-17.0.4+8 (build 17.0.4+8) OpenJDK 64-Bit Server VM Temurin-17.0.4+8 (build 17.0.4+8, mixed mode, sharing)

OS version: Windows 10

Description of the problem including expected versus actual behavior:

According to the docs (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_state) setting clean_run to true should set the value of :sql_last_value to 0 or '1970-01-01 00:00:00', if its a datetime value, for every execution.

But it only works for the first execution, after that it updates the value to the last execution time, even if I also set record_last_run to false.

Steps to reproduce:

You can reproduce the issue with this input:

input {
  jdbc {
    jdbc_driver_library => "C:\\ProgramData\\ElasticSearch\\logstash\\drivers\\mssql-jdbc-10.2.0.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://server\instance;databasename=database;trustServerCertificate=true"
    jdbc_default_timezone => "Europe/Berlin"
    jdbc_user => "user"
    jdbc_password => "pw"
    schedule => "*/1 * * * *"
    clean_run => true
    record_last_run => false
    statement => "SELECT feld FROM tabelle WHERE x >= :sql_last_value"
  }
}

This is the same issue that @palin first encountered and put up on the old jdbc-input-plugin repository, see here for more details: https://github.com/logstash-plugins/logstash-input-jdbc/issues/373

marius-mcp commented 6 months ago

Acc to documentation image image @s137

So this is how it should be used for the :sql_last_run to be reverted to 0 or 0 timestamp (1970...) for each pipeline start!

clean_run => true record_last_run => true last_run_metadata_path => "/...."

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#plugins-inputs-jdbc-record_last_run

And in here it seems like the clean_run has effect only when record_last_run is true.

https://github.com/logstash-plugins/logstash-input-jdbc/blob/49b1c2f80607b2affa9461b95c8e32fc76c2a6e4/lib/logstash/plugin_mixins/jdbc/value_tracking.rb

    def self.build_last_value_tracker(plugin)
      handler = plugin.record_last_run ? FileHandler.new(plugin.last_run_metadata_path) : NullFileHandler.new(plugin.last_run_metadata_path)
      if plugin.record_last_run
        handler = FileHandler.new(plugin.last_run_metadata_path)
      end
      if plugin.clean_run
        handler.clean
      end

And also this patch conditions the file update on record_last_run, but this is NOT NEEDED!

https://github.com/logstash-plugins/logstash-input-jdbc/pull/374/files#diff-4b2b7343665fff62c6e83c67858843521513dea8de9b7afaa0b33b53362aaa30

Also there is a pleonasm in this code

     handler = plugin.record_last_run ? FileHandler.new(plugin.last_run_metadata_path) : NullFileHandler.new(plugin.last_run_metadata_path)
      if plugin.record_last_run
        handler = FileHandler.new(plugin.last_run_metadata_path)
      end

The if is useless.

marius-mcp commented 6 months ago

So after some digging, it turns out that the last_sql_value is cached because class ValueTracking is instantiated once and it is not read from the last_run_metadata_path unless logstash is restarted....

marius-mcp commented 6 months ago

@s137 for cursor pagination with

tracking_column => "id" // or any unique tracking_column_type => numeric clean_run => true record_last_run => true last_run_metadata_path => "/...."

I think the solution would be:

WHEN the pipeline exits because there are 0 retrieved rows:

if clean_run is true

set value to 0 or 1970... to the last_run_metadata_path file

UPDATE: I could not set it up while using the scheduler..

marius-mcp commented 6 months ago

Update. The scheduler spoils everything up running just 1 query at a time. Without scheduler, the query is repeated until there are no more rows to be ingested...

marius-mcp commented 6 months ago

So the final solution was to not use clean_run true at all. Logstash restarts it self after all pipelines are executed once.

So I created 2 pipelines, one with cursor paginate and one with offset paginate: offset paginate:

jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.0.33.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://...?zeroDateTimeBehavior=convertToNull&autoReconnect=true&tinyInt1isBit=false"
jdbc_user => ""
jdbc_password => ""
jdbc_paging_enabled => true
jdbc_paging_mode => "explicit"
jdbc_page_size => 20000
jdbc_default_timezone => "UTC"
last_run_metadata_path => "/home/logstash/jdbc_updated_at"
statement_filepath => "/etc/logstash/conf.d/last_2_minutes_updated_at.sql"
lowercase_column_names => false
connection_retry_attempts => 3
connection_retry_attempts_wait_time => 30

sql

WHERE updated_at IS NOT NULL AND updated_at > DATE_SUB(NOW(), INTERVAL 2 MINUTE) LIMIT :size OFFSET :offset

Cursor paginate:

jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.0.33.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://...?zeroDateTimeBehavior=convertToNull&autoReconnect=true&tinyInt1isBit=false&useCursorFetch=true"
jdbc_user => ""
jdbc_password => ""
jdbc_paging_enabled => false
jdbc_paging_mode => "explicit"
jdbc_fetch_size => 20000
jdbc_page_size => 20000
use_column_value => true
jdbc_default_timezone => "UTC"
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash/jdbc_created_at"
statement_filepath => "/etc/logstash/conf.d/jdbc_created_at.sql"
lowercase_column_names => false
connection_retry_attempts => 3
connection_retry_attempts_wait_time => 30

sql:

WHERE id > :sql_last_value ORDER BY id ASC LIMIT 20000

This will execute each statements once, then restart logstash and execute them again until logstash is stopped.

If I want to re-ingest all, i just have to delete manually the files from last_run_metadata_path (stop logstash, delete the index, create the mappings, and then restart logstash).

The scheduler from logstash is not compatible with cursor paginate and :sql_last_value.