y-ken / fluent-plugin-mysql-replicator

Fluentd input plugin to track insert/update/delete event from MySQL databases.
http://rubygems.org/gems/fluent-plugin-mysql-replicator
Other
56 stars 24 forks source link

mysql_replicator_multi fails to execute query #3

Open lauralorenz opened 10 years ago

lauralorenz commented 10 years ago

Similar I think to #2

Receiving error in td-agent logs that mysql_replicator_multi failed to execute query.

td-agent log

2014-07-29 18:08:21 +0000 [info]: mysql_replicator_multi: polling start. :config=>{"id"=>1, "is_active"=>1, "name"=>"fluentd_fact_price_history", "host"=>"localhost", "port"=>3306, "username"=>"fluentd", "password"=>"xxxxxxxxxxxxxxx", "database"=>"pocket_dwh", "query"=>"SELECT price_history_id as id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_spotted_date, missing, store_id, product_id, effective_date, ps_user_history_id, price_status, universal_user_id, insert_timestamp, last_update_timestamp FROM fact_price_history;", "prepared_query"=>"SELECT price_history_id as id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_spotted_date, missing, store_id, product_id, effective_date, ps_user_history_id, price_status, universal_user_id, insert_timestamp, last_update_timestamp FROM fact_price_history;", "interval"=>5, "primary_key"=>"id", "enable_delete"=>1, "enable_loose_insert"=>0, "enable_loose_delete"=>0}
2014-07-29 18:08:21 +0000 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine"
2014-07-29 18:08:22 +0000 [warn]: emit transaction failed  error_class=NoMethodError error=#<NoMethodError: undefined method `to_msgpack' for #<BigDecimal:7f372fb85070,'0.529E1',182014-07-29 18:08:22 +0000 [warn]: emit transaction failed  error_class=NoMethodError error=#<NoMethodError: undefined method `to_msgpack' for #<BigDecimal:7f372fb85070,'0.529E1',18(18)>>
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb:32:in `to_msgpack'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb:32:in `to_msgpack'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb:32:in `format'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/output.rb:254:in `block in format_stream'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/event.rb:54:in `call'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/event.rb:54:in `each'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/output.rb:253:in `format_stream'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/output.rb:239:in `emit'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/match.rb:36:in `emit'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/engine.rb:154:in `emit_stream'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.50/lib/fluent/engine.rb:134:in `emit'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:241:in `emit_record'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:135:in `detect_insert_update'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:99:in `block (3 levels) in poll'
  2014-07-29 18:08:22 +0000 [warn]: <internal:prelude>:10:in `synchronize'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:94:in `block (2 levels) in poll'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:85:in `each'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:85:in `block in poll'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:75:in `loop'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:75:in `poll'
  2014-07-29 18:08:22 +0000 [warn]: /usr/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-mysql-replicator-0.4.1/lib/fluent/plugin/in_mysql_replicator_multi.rb:41:in `block (2 levels) in start'
2014-07-29 18:08:22 +0000 [error]: mysql_replicator_multi: failed to execute query. :config=>{"id"=>1, "is_active"=>1, "name"=>"fluentd_fact_price_history", "host"=>"localhost", "port"=>3306, "username"=>"fluentd", "password"=>"****", "database"=>"pocket_dwh", "query"=>"SELECT price_history_id as id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_spotted_date, missing, store_id, product_id, effective_date, ps_user_history_id, price_status, universal_user_id, insert_timestamp, last_update_timestamp FROM fact_price_history;", "prepared_query"=>"SELECT price_history_id as id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_spotted_date, missing, store_id, product_id, effective_date, ps_user_history_id, price_status, universal_user_id, insert_timestamp, last_update_timestamp FROM fact_price_history;", "interval"=>5, "primary_key"=>"id", "enable_delete"=>1, "enable_loose_insert"=>0, "enable_loose_delete"=>0}
2014-07-29 18:08:22 +0000 [error]: error: undefined method `to_msgpack' for #<BigDecimal:7f372fb85070,'0.529E1',18(18)>

settings table

*************************** 1. row ***************************
                 id: 1
          is_active: 1
               name: fluentd_fact_price_history
               host: localhost
               port: 3306
           username: fluentd
           password: xxxxxxxxxxx
           database: pocket_dwh
              query: SELECT price_history_id as id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_spotted_date, missing, store_id, product_id, effective_date, ps_user_history_id, price_status, universal_user_id, insert_timestamp, last_update_timestamp FROM fact_price_history;
     prepared_query: SELECT price_history_id as id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_spotted_date, missing, store_id, product_id, effective_date, ps_user_history_id, price_status, universal_user_id, insert_timestamp, last_update_timestamp FROM fact_price_history;
           interval: 5
        primary_key: id
      enable_delete: 1
enable_loose_insert: 0
enable_loose_delete: 0
1 row in set (0.00 sec)

td-agent config

<source>
  type mysql_replicator_multi

  # Database connection setting for manager table.
  manager_host localhost
  manager_username fluentd
  manager_password xxxxxxxxxx
  manager_database replicator_manager

  # Format output tag for each events. Placeholders usage as described below.
  tag replicator.${name}.${event}.${primary_key}
  # ${name} : the value of `replicator_manager.settings.name` in manager table.
  # ${event} : the variation of row event type by insert/update/delete.
  # ${primary_key} : the value of `replicator_manager.settings.primary_key` in manager table.
</source>

<match replicator.**>
 type mysql_replicator_elasticsearch

 #Set Elasticsearch connection.
 host localhost
 port 9200

 #Set Elasticsearch index, type, and unique id (primary_key) from tag.
 tag_format (?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$

 #Set frequency of sending bulk request to Elasticsearch node.
 flush_interval 5s

 #Set maximum retry interval (required fluentd >= 0.10.41)
 max_retry_wait 1800

 #Queued chunks are flushed at shutdown process.
 #It's sample for td-agent. If you use Yamabiko, replace path from 'td-agent' to 'yamabiko'.
 flush_at_shutdown yes
 buffer_type file
 buffer_path /var/log/td-agent/buffer/mysql_replicator_elasticsearch
</match>

I have tested this query in my database and it contains no sql errors and can return data.

y-ken commented 10 years ago

Hi @lauralorenz

Thank you for your report! This issue seems like BigDecimal issue. https://github.com/msgpack/msgpack-ruby/issues/26

I'll fix it soon.

y-ken commented 10 years ago

I've released new version of gem as v0.4.2. Please try it with latest fluent-plugin-mysql-replicator ! http://rubygems.org/gems/fluent-plugin-mysql-replicator

and if you like this repository, please click star button for this. https://github.com/y-ken/fluent-plugin-mysql-replicator/stargazers

lauralorenz commented 10 years ago

Thanks! Just got to try it out now.

However, I haven't been able to test it because I have a new issue. I tested with a small table and the replicator worked (made index replicator and added in documents from my table). I added a larger table as another record in replicator_manager.settings and restarted td-agent, but records from the second table did not show up (even though the _type was mapped in elasticsearch). I stopped td-agent, deleted the replicator index, and restarted the service with only the large table as a record in replicator_manager.settings. Now td-agent logs hangs and nothing happens in elasticsearch logs, including replicator index is not created.

2014-08-11 19:07:26 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2014-08-11 19:07:26 +0000 [info]: mysql_replicator_multi: polling start. :config=>{"id"=>2, "is_active"=>1, "name"=>"pocket_dwh.fact_price", "host"=>"localhost", "port"=>3306, "username"=>"fluentd", "password"=>"xxxxxxxxxxxxxxx", "database"=>"pocket_dwh", "query"=>"SELECT id, product_id, store_id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_expiration_date,price_sale_spotted_date,effective_date,insert_timestamp, last_update_timestamp FROM fact_price;", "prepared_query"=>"SELECT id, product_id, store_id, price, price_sale, price_quantity, price_sale_quantity, price_sale_expiration, price_sale_expiration_date,price_sale_spotted_date,effective_date,insert_timestamp, last_update_timestamp FROM fact_price;", "interval"=>5, "primary_key"=>"id", "enable_delete"=>1, "enable_loose_insert"=>0, "enable_loose_delete"=>0}
2014-08-11 19:07:26 +0000 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine"
y-ken commented 10 years ago

Hi @lauralorenz

It might be remain cache. Please truncate table replicator_manager.hash_tables ;