imweijh / dailywork

1 stars 0 forks source link

use logstash syn oracle table to kafka(sql based) #2

Closed imweijh closed 3 years ago

imweijh commented 3 years ago

https://gist.github.com/imweijh/8db00663194a843715dbe94e1e57707b

imweijh commented 4 months ago
input { 
  jdbc {
    type => "jzjy"
    jdbc_driver_library => "D:\logstash-7.12.0\ojdbc8.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"

    jdbc_connection_string => "jdbc:oracle:thin:@//127.138.1.78:1521/fgjzjyfz"
    jdbc_user => "theuser"
    jdbc_password => "thepasswd"
    jdbc_fetch_size => 1000

    tracking_column => "record_sn"
    use_column_value => true
    tracking_column_type => "numeric"
    statement => "select * from theuser.matching where record_sn > ?"
    prepared_statement_bind_values => [":sql_last_value"]
    prepared_statement_name => "SelectfromMatchingWhereRecordSN_JZJY"
    use_prepared_statements => true
    last_run_metadata_path => "D:\logstash-7.12.0\_LOGSTASH_JDBC_LAST_RUN"
    schedule => "* * * * * *"
  }

  jdbc {
    type => "hkjy"
    jdbc_driver_library => "D:\logstash-7.12.0\ojdbc8.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"

    jdbc_connection_string => "jdbc:oracle:thin:@//127.138.1.78:1521/fgjzjyfz"
    jdbc_user => "theuser"
    jdbc_password => "thepasswd"
    jdbc_fetch_size => 1000

    tracking_column => "record_sn"
    use_column_value => true
    tracking_column_type => "numeric"
    statement => "select * from theuser.HK_MATCHING where record_sn > ?"
    prepared_statement_bind_values => [":sql_last_value"]
    prepared_statement_name => "SelectfromMatchingWhereRecordSN_HK"
    use_prepared_statements => true
    last_run_metadata_path => "D:\logstash-7.12.0\_LOGSTASH_JDBC_LAST_RUN_HK"
    schedule => "* * * * * *"
  }

  jdbc {
    type => "rzrq"
    jdbc_driver_library => "D:\logstash-7.12.0\ojdbc8.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"

    jdbc_connection_string => "jdbc:oracle:thin:@//127.138.1.79:1521/fgrzrqfz"
    jdbc_user => "theuser"
    jdbc_password => "thepasswd"
    jdbc_fetch_size => 1000

    tracking_column => "record_sn"
    use_column_value => true
    tracking_column_type => "numeric"
    statement => "select * from theuser.matching where record_sn > ?"
    prepared_statement_bind_values => [":sql_last_value"]
    prepared_statement_name => "SelectfromMatchingWhereRecordSN_RZRQ"
    use_prepared_statements => true
    last_run_metadata_path => "D:\logstash-7.12.0\_LOGSTASH_JDBC_LAST_RUN_RZRQ"
    schedule => "* * * * * *"
  }
}

filter {
    metrics {
      meter => "events"
      add_tag => "metric"
    }
}

output {
# stdout { codec => rubydebug }
# stdout { codec => dots }

  # if "metric" in [tags] {
    # stdout {
      # codec => line {
      # format => "rate: %{[events][rate_1m]}"
      # }
    # }
  # }

  if [type] in ["jzjy","hkjy"] {
    kafka {
      bootstrap_servers => "127.138.1.232:9092"
      codec => json
      topic_id => "matching"
      compression_type => "snappy"
      # acks => "0"
    }
  }

  if [type] == "rzrq" {
    kafka {
      bootstrap_servers => "127.138.1.232:9092"
      codec => json
      topic_id => "matchingRZRQ"
      compression_type => "snappy"
      # acks => "0"
    }
  }
#  elasticsearch {
#    index => "matching-%{+YYYY.MM}"
#    hosts => ["127.0.0.1"]
#  }
}