fluent / fluent-plugin-sql

SQL input/output plugin for Fluentd
108 stars 58 forks source link

Records are dumped without retry when PostgreSQL temporarily shuts down during insertion #138

Open am-beta opened 1 year ago

am-beta commented 1 year ago

I have an input file containing about 400k lines, one JSON entry per line, read using the tail input, and I am using SQL output into Postgres. In normal conditions all entries are properly inserted.

However I need to make sure that a sudden stop of PostgreSQL will not lead to a loss of records by Fluentd and that these records will be retried. When I tried stopping PostgreSQL at a random moment during the insertion, about 9k entries were dumped because the plugin classified these errors as "deterministic errors":

2023-09-13 19:09:51 +0200 [warn]: #0 Got deterministic error. Fallback to one-by-one import error_class=ActiveRecord::StatementInvalid error="PG::ConnectionBad: PQconsumeInput() FATAL:  terminating connection due to administrator command\nSSL connection has been closed unexpectedly\n"
2023-09-13 19:09:51 +0200 [error]: #0 Got deterministic error again. Dump a record error_class=ActiveRecord::StatementInvalid error="PG::ConnectionBad: PQsocket() can't get socket descriptor" record=#<Fluent::Plugin::SQLOutput::BaseModel_506070496::AccessLog […]>

and 9k more Got deterministic error again messages with a dump of the record. After that, the Postgres connection error was detected and the plugin retried as it should until Postgres comes back up, and the rest of the records are inserted but the dumped ones are lost.

The behaviour I expect is that no records should be dumped in case Postgres is temporarily stopped.

This is my configuration:

<match td.*.*>
  @type sql
  adapter postgresql
  host localhost
  port 5432
  database fluentd
  username fluentd
  password supersecretpasswd
  <table>
    table access_logs
    column_mapping '@timestamp:timestamp,action,api_version,host,method,path,route,controller,duration,status,ip,source,retriever_cached:cached,token_id,parameters:params,request_id'
  </table>
</match>
<source>
  @type tail
  @id input_tail
  <parse>
    @type json
  </parse>
  path /path/to/input.json
  pos_file /path/to/input.json.pos
  tag td.foo.bla
</source>

Packages versions:

$ fluentd --version
fluent-package 5.0.1 fluentd 1.16.2 (d5685ada81ac89a35a79965f1e94bbe5952a5d3a)
$ fluent-gem list|grep sql
fluent-plugin-sql (2.3.0)
$ cat /etc/os-release 
PRETTY_NAME="Ubuntu 22.04.3 LTS"
rockliffelewis commented 6 months ago

The offending section appears to be this 'fallback' feature that is enabled by default and can be disabled by setting enable_fallback false in your config.

https://github.com/fluent/fluent-plugin-sql/blob/master/lib/fluent/plugin/out_sql.rb#L105-L136

What this feature is doing, is for certain types of database error it will change from batch processing into processing messages one by one, and if further SQL errors happen it will just drop the message.

This should probably be disabled by default, or changed so it doesnt just dump messages if it gets an odd response from postgres, because this section will throw if you deliberately make your database read only, or restart the database.