fluent-plugins-nursery / fluent-plugin-systemd

This is a fluentd input plugin. It reads logs from the systemd journal.
Apache License 2.0
153 stars 43 forks source link

Can't configure storage with the conf.arg parameter #109

Open b-a-t opened 1 year ago

b-a-t commented 1 year ago

I'm new to the fluentd so it could be that I'm missing something. Trying to configure it to send all the busy journald logs to the OpenSearch I've tried to configure it with multiple workers, so I could perform several parallel processes. Apparently fluent-plugin-systemd still doesn't support multi-worker setup, so I've tried to assign input to one fixed worker.

That seems to work, but the need to specify storage path looked overwhelming(I wanted to keep all work files in the more appropriate /var/lib/fluent dir rather than /var/log/fluent). Especially, as the documentation refers to the storage plugin without any limitations.

So, the configuration I tried first was:

<system>
  workers 2
  log_level "#{ENV['LOG_LEVEL'] || 'warn'}"
  root_dir "/var/lib/fluent"
</system>

<worker 0>
  <source>
    @id systemd
    @type systemd
    tag systemd
    path "/var/log/journal"
    read_from_head true

    <storage>
      @type local
      persistent true
      path "/var/lib/fluent/systemd-cursor"
    </storage>

    @label @SYSTEMD
  </source>
</worker>

That worked, producing a position file /var/lib/fluent/systemd-cursor/worker0/storage.json. So far so good. But the need to specify /var/lib/fluent seems to be excessive.

So, my next(2) try was:

    <storage>
      @type local
      @id system_cursor
      persistent true
    </storage>

That also worked, producing /var/lib/fluent/worker0/systemd/storage.json. That already brings questions, as the name of the cursor file got hard coded to the storage.json, as well as in the first variant, but there you at least get a custom systemd-cursor in the path which may make the difference in case there are two or more inputs with @type systemd(pretty common setup for K8s, apparently).

Also, log got:

2023-11-04 00:03:56 +0000 [warn]: #0 parameter '@id' in <storage>
  @type "local"
  @id systemd_cursor
  persistent true
</storage> is not used.

which explains the result above.

Ok, how about(3) the conf.arg from the documentation?

    <storage systemd_cursor>
      @type local
      persistent true
    </storage>

That simply doesn't work, failing with the:

Nov  4 00:11:59 bat fluentd[1931750]: /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin_helper/storage.rb:54:in `storage_create': BUG: both type and conf are not specified (ArgumentError)
Nov  4 00:11:59 bat fluentd[1931750]:                   raise ArgumentError, "BUG: both type and conf are not specified"
Nov  4 00:11:59 bat fluentd[1931750]:                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-systemd-1.0.5/lib/fluent/plugin/in_systemd.rb:53:in `configure'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin.rb:187:in `configure'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/root_agent.rb:320:in `add_source'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/root_agent.rb:161:in `block in configure'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/root_agent.rb:155:in `each'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/root_agent.rb:155:in `configure'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/engine.rb:105:in `configure'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/engine.rb:80:in `run_configure'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/supervisor.rb:571:in `run_supervisor'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/command/fluentd.rb:352:in `<top (required)>'
Nov  4 00:11:59 bat fluentd[1931750]: #011from <internal:/opt/fluent/lib/ruby/3.2.0/rubygems/core_ext/kernel_require.rb>:85:in `require'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/bin/fluentd:15:in `<top (required)>'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/bin/fluentd:25:in `load'
Nov  4 00:11:59 bat fluentd[1931750]: #011from /opt/fluent/bin/fluentd:25:in `<main>'
Nov  4 00:11:59 bat systemd[1]: fluentd.service: Control process exited, code=exited status=1
Nov  4 00:11:59 bat systemd[1]: fluentd.service: Failed with result 'exit-code'.

At this point, I thought that I was doing something wrong, and I re-read the documentation several times as well as tried to understand the Ruby source code(not in my portfolio).

In a few other plugins, I found the way how (local) storage is configured and modified the code accordingly.

Amusingly that helped and I got the desired /var/lib/fluent/worker0/systemd/storage.systemd_cursor.json!

Without conf.arg it gives the expected /var/lib/fluent/worker0/systemd/storage.json.

Configuration (2) also works the same way with the message about redundant @id(I was hoping it'll actually pick the parameter).

I haven't tried though how all those permutations would work without the multiple workers.

b-a-t commented 1 year ago

So, the change is really small. Without much of the understanding, just mostly cut-n-paste:

--- in_systemd.rb.orig  2023-11-03 17:37:20.875477097 +0000
+++ in_systemd.rb       2023-11-04 00:17:33.088467944 +0000
@@ -50,7 +50,8 @@ module Fluent
       def configure(conf)
         super
         @journal = nil
-        @pos_storage = storage_create(usage: 'positions')
+        config = conf.elements.find{|e| e.name == 'storage' }
+        @pos_storage = storage_create(usage: 'positions', conf: config, default_type: DEFAULT_STORAGE_TYPE)
         @mutator = SystemdEntryMutator.new(**@entry_opts.to_h)
         @mutator.warnings.each { |warning| log.warn(warning) }
       end

If that seems legitimate, I can make an MR, although that diff is trivial to apply by hand as well.