magnusbaeck / logstash-filter-verifier

Apache License 2.0
191 stars 27 forks source link

Testing filters that include the `logstash-aggregate-filter` missing aggregation events. #168

Closed apple-corps closed 2 years ago

apple-corps commented 2 years ago

When creating a filter with https://github.com/logstash-plugins/logstash-filter-aggregate the aggregate event is not in the test output or actual output. Then using the current beta version of this filter verifier I can't test for the aggregate events, which are required for my filter testing.

It should be fairly easy to reproduce. I will try to whip up a quick and easy reproduction repository.

Is there something that I overlooked that would support this?

I saw some mention regarding mock plugins. Is there a way to make a mock output and test that way?

apple-corps commented 2 years ago

reproduction => https://github.com/apple-corps/logstash-filter-aggregate-repro

We see two events executing the filter against the input manually. https://github.com/apple-corps/logstash-filter-aggregate-repro/blob/3e59f9767208671f7d0e8bb9ce1fe00b2c853363/example.out#L2

But the passing test doesn't recognize the aggregation event https://github.com/apple-corps/logstash-filter-aggregate-repro/blob/3e59f9767208671f7d0e8bb9ce1fe00b2c853363/repro.json#L11

jgough commented 2 years ago

If I run your config file directly through logstash I get one output event, which seems to be what is output by LFV2?

bash-4.2$ cat pipeline/pipeline.conf
input {
    generator {
        lines => ["Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected"]
        count => 1
        id => "input_plugin"
    }
}

filter {

    dissect {
        mapping => {
            "message" => "%{logTimestamp}|%{hostname}| %{Service}[%{id}]: <%{level}>  [%{unix}] %{log}"
        }
    }

    aggregate {
        task_id => "%{Service}"
        code =>"
          map['details'] = {'host' => event.get('hostname'),'log-level' => event.get('level'), 'log-ts' => event.get('logTimestamp') }
          map['service'] = {'service-name' => event.get('service'), 'log-msg' => event.get('log')}
        "
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "Service"
        timeout => 1
    }
}

output {
    stdout {}
}

bash-4.2$ logstash -f pipeline/pipeline.conf
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
[2021-12-03T14:57:45,435][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
[2021-12-03T14:57:45,441][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.15.2", "jruby.version"=>"jruby 9.2.19.0 (2.5.8) 2021-06-15 55810c552b OpenJDK 64-Bit Server VM 11.0.12+7 on 11.0.12+7 +indy +jit [linux-x86_64]"}
[2021-12-03T14:57:45,463][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
[2021-12-03T14:57:45,472][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
[2021-12-03T14:57:45,708][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-12-03T14:57:45,727][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"e36c5307-05bf-42d6-979e-7f4e99004f4c", :path=>"/usr/share/logstash/data/uuid"}
[2021-12-03T14:57:46,353][WARN ][logstash.monitoringextension.pipelineregisterhook] xpack.monitoring.enabled has not been defined, but found elasticsearch configuration. Please explicitly set `xpack.monitoring.enabled: true` in logstash.yml
[2021-12-03T14:57:46,355][WARN ][deprecation.logstash.monitoringextension.pipelineregisterhook] Internal collectors option for Logstash monitoring is deprecated and targeted for removal in the next major version.
Please configure Metricbeat to monitor Logstash. Documentation can be found at:
https://www.elastic.co/guide/en/logstash/current/monitoring-with-metricbeat.html
[2021-12-03T14:57:46,712][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T14:57:46,766][WARN ][deprecation.logstash.outputs.elasticsearch] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T14:57:47,022][INFO ][logstash.licensechecker.licensereader] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elasticsearch:9200/]}}
[2021-12-03T14:57:47,132][WARN ][logstash.licensechecker.licensereader] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"http://elasticsearch:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch: Name or service not known"}
[2021-12-03T14:57:47,163][WARN ][logstash.licensechecker.licensereader] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch {:url=>http://elasticsearch:9200/, :error_message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
[2021-12-03T14:57:47,167][ERROR][logstash.licensechecker.licensereader] Unable to retrieve license information from license server {:message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
[2021-12-03T14:57:47,189][ERROR][logstash.monitoring.internalpipelinesource] Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster.
[2021-12-03T14:57:47,558][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-12-03T14:57:48,154][INFO ][org.reflections.Reflections] Reflections took 116 ms to scan 1 urls, producing 120 keys and 417 values
[2021-12-03T14:57:48,764][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T14:57:49,084][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["/usr/share/logstash/pipeline/pipeline.conf"], :thread=>"#<Thread:0x4db2b832 run>"}
[2021-12-03T14:57:49,988][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.9}
[2021-12-03T14:57:50,032][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-12-03T14:57:50,097][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
{
        "@version" => "1",
             "log" => "device (docker0): carrier: link connected",
        "sequence" => 0,
      "@timestamp" => 2021-12-03T14:57:50.091Z,
         "message" => "Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected",
         "Service" => "NetworkManager",
           "level" => "info",
            "unix" => "1638514606.5479",
    "logTimestamp" => "Dec  2 22:56:46 ",
        "hostname" => " VALIS ",
            "host" => "73d113e140a3",
              "id" => "724"
}
[2021-12-03T14:57:50,426][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2021-12-03T14:57:50,653][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2021-12-03T14:57:50,697][INFO ][logstash.runner          ] Logstash shut down.
apple-corps commented 2 years ago

Please run the configuration provided and write the file output. You will see the aggregate out. I'm not sure regarding your behavior. But I am writing aggregates to files using outputs and trying to test that now. You will see the aggregated output in the file. The point of the aggregate is to aggregate messages and push code in the code block to those messages provided the timeout. Perhaps you are seeing the output outside the aggregate filter before the timer is applied. See the first line of the output.

This is the simplest reproduction I could produce for you from scratch. It took 4 hours but I create an environment you can launch from. https://code.visualstudio.com/docs/remote/containers

https://github.com/apple-corps/logstash-filter-aggregate-repro/blob/logstash-verifier-aggregate-repro/conf/repro.conf

https://github.com/apple-corps/logstash-filter-aggregate-repro/blob/logstash-verifier-aggregate-repro/example.out

I have provided a dev container you can launch from VS code to reproduce easily. logstash -w 1 -f /workspaces/conf/repro.conf will write example.out ^^ when launching from vs code dev-containers.

apple-corps commented 2 years ago

@jgough I investigated the issue further. I tried the configuration with the generator. There is an issue and cannot reproduce using a generator. I suggest you change your input to stdin and paste in the log line. Here's the output to my run, with a modified config to standard out.

logstash -w 1 -f /workspaces/logstash-filter-aggregate-repro/conf/repro2.conf 
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
[2021-12-03T23:52:53,597][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
[2021-12-03T23:52:53,610][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.15.2", "jruby.version"=>"jruby 9.2.19.0 (2.5.8) 2021-06-15 55810c552b OpenJDK 64-Bit Server VM 11.0.12+7 on 11.0.12+7 +indy +jit [linux-x86_64]"}
[2021-12-03T23:52:54,034][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-12-03T23:52:54,916][WARN ][logstash.monitoringextension.pipelineregisterhook] xpack.monitoring.enabled has not been defined, but found elasticsearch configuration. Please explicitly set `xpack.monitoring.enabled: true` in logstash.yml
[2021-12-03T23:52:54,919][WARN ][deprecation.logstash.monitoringextension.pipelineregisterhook] Internal collectors option for Logstash monitoring is deprecated and targeted for removal in the next major version.
Please configure Metricbeat to monitor Logstash. Documentation can be found at: 
https://www.elastic.co/guide/en/logstash/current/monitoring-with-metricbeat.html
[2021-12-03T23:52:55,293][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T23:52:55,376][WARN ][deprecation.logstash.outputs.elasticsearch] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T23:52:55,747][INFO ][logstash.licensechecker.licensereader] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elasticsearch:9200/]}}
[2021-12-03T23:52:55,909][WARN ][logstash.licensechecker.licensereader] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"http://elasticsearch:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch: Name or service not known"}
[2021-12-03T23:52:55,956][WARN ][logstash.licensechecker.licensereader] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch {:url=>http://elasticsearch:9200/, :error_message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
[2021-12-03T23:52:55,964][ERROR][logstash.licensechecker.licensereader] Unable to retrieve license information from license server {:message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
[2021-12-03T23:52:56,010][ERROR][logstash.monitoring.internalpipelinesource] Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster.
[2021-12-03T23:52:56,419][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-12-03T23:52:56,939][INFO ][org.reflections.Reflections] Reflections took 68 ms to scan 1 urls, producing 120 keys and 417 values 
[2021-12-03T23:52:57,479][WARN ][deprecation.logstash.codecs.line] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T23:52:57,486][WARN ][deprecation.logstash.inputs.stdin] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-12-03T23:52:57,761][WARN ][logstash.javapipeline    ][main] 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary
[2021-12-03T23:52:57,827][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/workspaces/logstash-filter-aggregate-repro/conf/repro2.conf"], :thread=>"#<Thread:0x2b9b77d run>"}
[2021-12-03T23:52:58,524][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.69}
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[2021-12-03T23:52:58,598][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2021-12-03T23:52:58,651][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected
{
           "level" => "info",
            "unix" => "1638514606.5479",
         "Service" => "NetworkManager",
            "host" => "962d01a4e572",
        "@version" => "1",
         "message" => "Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected",
              "id" => "724",
    "logTimestamp" => "Dec  2 22:56:46 ",
             "log" => "device (docker0): carrier: link connected",
      "@timestamp" => 2021-12-03T23:53:19.247Z,
        "hostname" => " VALIS "
}
{
       "details" => {
           "log-ts" => "Dec  2 22:56:46 ",
             "host" => " VALIS ",
        "log-level" => "info"
    },
       "service" => {
        "service-name" => nil,
             "log-msg" => "device (docker0): carrier: link connected"
    },
       "Service" => "NetworkManager",
      "@version" => "1",
    "@timestamp" => 2021-12-03T23:53:23.631Z
}
[2021-12-03T23:53:25,956][ERROR][logstash.licensechecker.licensereader] Unable to retrieve license information from license server {:message=>"No Available connections"}
[2021-12-03T23:53:25,959][WARN ][logstash.licensechecker.licensereader] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"http://elasticsearch:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch: Name or service not known"}
apple-corps commented 2 years ago

repro2.conf

input {
  stdin {
    id => "stdin1"
  }
}

filter {

    dissect {
        mapping => {
            "message" => "%{logTimestamp}|%{hostname}| %{Service}[%{id}]: <%{level}>  [%{unix}] %{log}"
        }
    }

    aggregate {
        task_id => "%{Service}"
        code =>"
          map['details'] = {'host' => event.get('hostname'),'log-level' => event.get('level'), 'log-ts' => event.get('logTimestamp') }
          map['service'] = {'service-name' => event.get('service'), 'log-msg' => event.get('log')}
        "
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "Service"
        timeout => 1
    }
}

output {
    stdout {}
    #file { 
    #    path => "/workspaces/logstash-filter-aggregate-repro/example2.out"
    #}
}
apple-corps commented 2 years ago

Also note that it may be necessary to add CMD [ "sleep", "infinity" ] to the Dockerfile if there are issues with running the example container. I had to do this via vscode on the mac vs on my linux host but not sure why.

jgough commented 2 years ago

Hmm, I'm not that familiar with the aggregate plugin, and I can't check this until Monday, but I suspect it is to do with the timeout. Maybe it is possible to mock in an additional aggregate filter with end_of_task => true after your current one?

apple-corps commented 2 years ago

Maybe it is possible to mock in an additional aggregate filter with end_of_task => true after your current one?

Not sure that makes sense.

We see a difference between the Generator and stdin plugin, where Generator is functioning like the issue we see for LSV.

Is logstash-filter-verifier using Generators internally as input?

If we use an input mock can we set the message input and expect a different pipeline built and different behavior?

jgough commented 2 years ago

I can check tomorrow but I suspect the issue is that the aggregate plugin is a bit of an oddity and actively requires a timeout to produce the event - LFV has no awareness of this timeout so it needs some help. I could be completely wrong though.

Yes, LFV2 currently uses a generator input to simulate events and the generator input is different to the stdin input in how it terminates the entire pipeline.

One way I can think (off the top of my head) of to force the end of the aggregated event is to mock in another aggregate filter afterwards with end_of_task. I am just speculating at the moment though and can check tomorrow.

apple-corps commented 2 years ago

I tested a Generator input as you shared against Logstash, not LFV2. I didn't see the aggregate triggered and I think the pipeline shuts down abruptly. Then I shared the output above for the stdin input with the 1 second timeout configured. So it looks clear to me that there's a difference in handling Generated inputs.

Then I expect I need to create events by some other means than Generators to see if I can use LFV2 to test my filters. The input mocks come to mind. I will give it a shot but perhaps I'm incorrect regarding their application.

apple-corps commented 2 years ago

I'm now trying to follow the input mock to see if it will function that way. I placed the testdata folder in my repro repo and have update my test.

https://github.com/apple-corps/logstash-filter-aggregate-repro/blob/2757a58fe1a31cf07ab21927012e45224de7f951/repro-input-mock.json#L6

Then I'm attempting to invoke from my Container. But I'm missing something regarding an appropriate setup.

/opt/logstash/logstash-filter-verifier daemon run --pipeline /workspaces/logstash-filter-aggregate-repro/pipelines.yml --pipeline-base /workspaces/logstash-filter-aggregate-repro/testdata/inputoutputmock --testcase-dir /workspaces/logstash-filter-aggregate-repro/ --add-missing-id
logstash-filter-verifier: error: expect the Logstash config to have at least 1 input and 1 output, got 0 inputs and 0 outputs
apple-corps commented 2 years ago

I'm able to execute the basic pipeline example and tests but the input output example fails if following the same path layout. Finally I'm not sure if the mocking might utilize a generator making my attempts futile, since i suspect the aggregate plugin does not function along with generators.

bash-4.2$ /opt/logstash/logstash-filter-verifier daemon run --pipeline /workspaces/logstash-filter-aggregate-repro/testdata/basic_pipeline.yml --pipeline-base /workspaces/logstash-filter-aggregate-repro/testdata/basic_pipeline --testcase-dir /workspaces/logstash-filter-aggregate-repro/testdata/testcases/basic_pipeline --add-missing-id
☑ Comparing message 1 of 2 from testcase1.json
☑ Comparing message 2 of 2 from testcase1.json
☑ Comparing message 1 of 1 from testcase2.json

Summary: ☑ All tests: 3/3
         ☑ testcase1.json: 2/2
         ☑ testcase2.json: 1/1


/opt/logstash/logstash-filter-verifier daemon run --pipeline /workspaces/logstash-filter-aggregate-repro/testdata/inputoutputmock.yml --pipeline-base /workspaces/logstash-filter-aggregate-repro/testdata/inputoutputmock --testcase-dir /workspaces/logstash-filter-aggregate-repro/testdata/testcases/inputoutputmock --add-missing-id
logstash-filter-verifier: error: expect the Logstash config to have at least 1 input and 1 output, got 0 inputs and 0 outputsx
apple-corps commented 2 years ago

Looks like I found the parameter by viewing the go. I don't develop go so don't have a go environment setup so it's a little cumbersome.

☑ Comparing message 1 of 1 from testcase.json

Summary: ☑ All tests: 1/1
         ☑ testcase.json: 1/1

Then I just need to adapt the mock test to my test case.

jgough commented 2 years ago

I think I may be trying to make this more complex than it needs to be! I believe you should be able to test this just by adding a second expected event. Here is a test that runs your tests and checks the two output events expected. LFV2 will wait until the two events are produced (or will eventually timeout) then it will exit.

test.json:

{
  "input_plugin": "stdin1",
  "fields": {
    "host": "d3209814fd"
  },
  "ignore": ["@timestamp","@version"],
  "testcases": [
    {
      "input": [
        "Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected"
      ],
      "expected": [
        {
          "message": "Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected",
          "logTimestamp": "Dec  2 22:56:46 ",
          "level": "info",
          "unix": "1638514606.5479",
          "log": "device (docker0): carrier: link connected",
          "hostname": " VALIS ",
          "Service": "NetworkManager",
          "id": "724",
          "host": "d3209814fd"
        },
        {
          "Service": "NetworkManager",
          "details": {
            "host": " VALIS ",
            "log-level": "info",
            "log-ts": "Dec  2 22:56:46 "
          },
          "service": {
            "log-msg": "device (docker0): carrier: link connected",
            "service-name": null
          }
        }
      ]
    }
  ]
}

Here is a full working example in a Dockerfile

# syntax=docker/dockerfile:1.3-labs
FROM docker.elastic.co/logstash/logstash:7.15.2

ENV LOGSTASH_FILTER_VERIFIER_VERSION v2.0.0-beta.2

USER root
RUN yum clean expire-cache && yum update -y && yum install curl && yum clean all
ADD https://github.com/magnusbaeck/logstash-filter-verifier/releases/download/${LOGSTASH_FILTER_VERIFIER_VERSION}/logstash-filter-verifier_${LOGSTASH_FILTER_VERIFIER_VERSION}_linux_386.tar.gz /opt/
RUN tar xvzf /opt/logstash-filter-verifier_${LOGSTASH_FILTER_VERIFIER_VERSION}_linux_386.tar.gz -C /opt \
    && mv /opt/logstash-filter-verifier /usr/bin/

USER logstash
RUN <<EOF
mkdir tests

cat <<EOT > /usr/share/logstash/config/pipelines.yml
- pipeline.id: pipeline
  path.config: "pipeline/pipeline.conf"
EOT

cat <<EOT > /usr/share/logstash/tests/test.json
{
  "input_plugin": "stdin1",
  "fields": {
    "host": "d3209814fd"
  },
  "ignore": ["@timestamp","@version"],
  "testcases": [
    {
      "input": [
        "Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected"
      ],
      "expected": [
        {
          "message": "Dec  2 22:56:46 | VALIS | NetworkManager[724]: <info>  [1638514606.5479] device (docker0): carrier: link connected",
          "logTimestamp": "Dec  2 22:56:46 ",
          "level": "info",
          "unix": "1638514606.5479",
          "log": "device (docker0): carrier: link connected",
          "hostname": " VALIS ",
          "Service": "NetworkManager",
          "id": "724",
          "host": "d3209814fd"
        },
        {
          "Service": "NetworkManager",
          "details": {
            "host": " VALIS ",
            "log-level": "info",
            "log-ts": "Dec  2 22:56:46 "
          },
          "service": {
            "log-msg": "device (docker0): carrier: link connected",
            "service-name": null
          }
        }
      ]
    }
  ]
}

EOT

cat <<EOT > /usr/share/logstash/pipeline/pipeline.conf
input {
    stdin {
        id => "stdin1"
    }
}

filter {

    dissect {
        mapping => {
            "message" => "%{logTimestamp}|%{hostname}| %{Service}[%{id}]: <%{level}>  [%{unix}] %{log}"
        }
    }

    aggregate {
        task_id => "%{Service}"
        code =>"
          map['details'] = {'host' => event.get('hostname'),'log-level' => event.get('level'), 'log-ts' => event.get('logTimestamp') }
          map['service'] = {'service-name' => event.get('service'), 'log-msg' => event.get('log')}
        "
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "Service"
        timeout => 1
    }
}

output {
    file {
        path => "/workspaces/logstash/example.out"
    }
}
EOT

cat <<EOT > /usr/share/logstash/run_tests.sh
echo "Starting daemon..."
logstash-filter-verifier daemon start &
sleep 5
logstash-filter-verifier daemon run --pipeline /usr/share/logstash/config/pipelines.yml --pipeline-base /usr/share/logstash/ --testcase-dir /usr/share/logstash/tests/test.json --add-missing-id
EOT

chmod a+x run_tests.sh
EOF

CMD ["/bin/bash", "/usr/share/logstash/run_tests.sh"]

Don't know how to build on Windows but in Linux I would run the following:

DOCKER_BUILDKIT=1 docker build --tag test .
docker run --rm test

Output:

Starting daemon...
☑ Comparing message 1 of 2 from test.json
☑ Comparing message 2 of 2 from test.json

Summary: ☑ All tests: 2/2
         ☑ test.json: 2/2

Just one more point, are you doing additional processing that requires the aggregate filter? Maybe the clone filter would be simpler if you are not requiring any of the complexities that the aggregate supports. That is a logstash issue not an LFV issue though. Hope this helps!

apple-corps commented 2 years ago
  1. I notice the "input_plugin": "stdin1", as the first line in test.json If I use another type of input say a filebeat input will I expect to change this accordingly? Should I expect any issues?

  1. It looks like you are using plugin mocking for the aggregate plugin, but I'm afraid it doesn't make sense to me. Why is the timeout section relevant but not mocking other functionality of the aggregate? Do you mind a little more explanation?

  1. Are you doing additional processing that requires the aggregate filter? Yes.

As mentioned above I reduced the aggregation logic to a simple form to start - (a single log event). I actually require aggregating multiple log lines (a more true aggregation, of "different classifications or aggregate blocks"). Examples of these sorts of configurations can be found in the documentation for the filter: https://github.com/logstash-plugins/logstash-filter-aggregate/blob/main/docs/index.asciidoc.


  1. Based on 2,3 above, Will your mock solution function when we have 2-3 or more different log types each with a corresponding aggregate block?

Specifically only one of these aggregate blocks in the configuration will have an end_of_task=> true.This is why I'm trying to understand well regarding the mock. We cannot have all aggregate blocks end the task or we'd get irrelevant output.


  1. Finally are you using a debugger to help step through LFV2?

If so can you give some tooling details or setup suggestions?

This might save time in test development. I'm not a go developer but think I could get by with some tooling knowledge.


Thanks for taking some time to help. I'll be trying to reproduce and scale to greater aggregate test cases tomorrow. Hopefully this solution will work. But I'm a little concerned the mock might not function well under multiple aggregates blocks.

jgough commented 2 years ago

My apologies, I left the mock in from some testing I was doing. It is not necessary. I have updated my comment.

All that is simply required is the additional expected event.

jgough commented 2 years ago

The "input_plugin" parameter indicates the id of the plugin that should produce the input line(s) for the test (see here). The type of input here is irrelevant as LFV2 is simulating the input plugin instead of actually using it.

If you want to find out more about what LFV2 is doing then you can start the daemon with logstash-filter-verifier daemon start --loglevel debug --no-cleanup and you'll get a lot more info. This will also not delete the generated test configuration which can help with debugging.

apple-corps commented 2 years ago

If you want to find out more about what LFV2 is doing then you can start the daemon with logstash-filter-verifier daemon > > start --loglevel debug --no-cleanup and you'll get a lot more info. This will also not delete the generated test configuration > which can help with debugging.

Nice. That sounds handy.


Regarding the test functioning. I'm surprised a little because I thought I saw the behavior of a generator outside of a LFV2 run and I only saw a single output running config via LS.

I'll be running through this tomorrow. I hope that it works, and I get off easy. I spent quite a bit of effort trying to get a test setup using logstash-devutils before I discovered this tooling.

jgough commented 2 years ago

@breml There is a slight bit of weirdness here given that the second event is generated on a timeout:

So, imagine a test that should only output one event - Outputting a second event after a timeout is a fail. With the current implementation this is not testable.

Maybe there is scope for adding a customisable timeout per test which if set ensures LFV2 waits before evaluating the test?

apple-corps commented 2 years ago

I'm now testing the aggregations beyond the example with a single log line. For a LS configuration that uses file inputs and outputs through logstash itself I get three events in the file output, one of which is the aggregation.

For the test using an identical LS configuration with the exception of input and outputs, and same test data. I see three events, but I don't see the aggregation event. It's as if the aggregation wasn't triggered.


As an aside from above as I was testing I saw another issue. When adding empty expected cases to the output. I can keep increasing the number of actual outputs beyond what they should be, and not seeming to converge.

jgough commented 2 years ago

Are you able to share your config and test cases?

apple-corps commented 2 years ago

@jgough I will try to reproduce it by extended above.

breml commented 2 years ago

Sorry for being silent on this issue. I am currently very busy. I hope to find the time to have a look at the open issues this week or next week. Sorry for the delay.

breml commented 2 years ago

Thanks for the patience. I now have read through this issue and I can share the following insights:

@breml There is a slight bit of weirdness here given that the second event is generated on a timeout:

  • If the test contains one expected output then the test passes and quits before the second event can be emitted

If one has a config, that produces 2 events (the regular event and the aggregated event) but in the test case only 1 event is expected, then the test case is wrong. In the current case, this is not detected by LFV, because the timeout of the aggregate filter (1 second) is way bigger than the timeout, that is currently hard coded to wait for some "late arrivals" (see https://github.com/magnusbaeck/logstash-filter-verifier/blob/master/internal/daemon/controller/controller.go#L143). This timeout is currently 50ms and is not configurable.

Changing this value to a longer timeout would have two consequences:

  1. for everyone using LFV, the execution of each test suite would become slower.
  2. there is always the chance, that even the increase timeout would still be to short for some edge cases.

Making this timeout configurable could be an option.

  • If the test contains two expected outputs then it waits until the second event then the test passes

So, imagine a test that should only output one event - Outputting a second event after a timeout is a fail. With the current implementation this is not testable.

This is true, if the timeout is bigger than the above mentioned timeout for the "late arrivals", which is currently 50ms. So again, increasing this value for everyone does not sound like a good idea, because it would make the test execution longer for everyone. Making this value configurable could be an option.

Maybe there is scope for adding a customisable timeout per test which if set ensures LFV2 waits before evaluating the test?

Agree.