opensearch-project / security

🔐 Secure your cluster with TLS, numerous authentication backends, data masking, audit logging as well as role-based access control on indices, documents, and fields
https://opensearch.org/docs/latest/security-plugin/index/
Apache License 2.0
180 stars 264 forks source link

[BUG] Audit log support kafka #4373

Open arvinsg opened 1 month ago

arvinsg commented 1 month ago

What is the bug? A clear and concise description of the bug. In source code,Kafka can used for audit log, but I couldn't find how to use it in the following configuration. How can I configure to use Kafka as the audit log?


plugins.security.audit.type: kafka
plugins.security.audit.config.bootstrap_servers: ...
plugins.security.audit.config.topic_name: ...

The above configuration will result in unknown settings error

How can one reproduce the bug? Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

What is the expected behavior? A clear and concise description of what you expected to happen.

What is your host/environment?

Do you have any screenshots? If applicable, add screenshots to help explain your problem.

Do you have any additional context? Add any other context about the problem.

scrawfor99 commented 4 weeks ago

[Triage] Hi @arvinsg, thanks for filing this issue. This seems like a question better handled by the implementation experts over on the OpenSearch forum. Here is a link: https://forum.opensearch.org/.

I am going to close this issue since I think this is just a configuration issue but please open a new issue if you find a bug in the code.

fazir83 commented 4 weeks ago

I'd like for this issue to be re-opened as it seems the configuration directives described in src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java#L54 seems to missing from src/main/java/org/opensearch/security/support/ConfigConstants.java#L41. Preventing configuration of this sink.

DarshitChanpura commented 4 weeks ago

This change was added at the inception of security plugin: https://github.com/opensearch-project/security/commit/beef34004d22b232c5e309c1728b20e889e19adc. One of the team members will investigate this and get back.

scrawfor99 commented 4 weeks ago

I'd like for this issue to be re-opened as it seems the configuration directives described in src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java#L54 seems to missing from src/main/java/org/opensearch/security/support/ConfigConstants.java#L41. Preventing configuration of this sink.

Hi @fazir83, this issue was closed because configuration questions are generally best handled on the forum. This issue does not discuss a specific code bug or reproduction steps but is more of a "how to" type issue. We generally suggest heading to the forum for this type of support.

fazir83 commented 3 weeks ago

Working around the issues with the configuration checking

[dev]$ git diff src/main/java/org/opensearch/security/support/ConfigConstants.java
diff --git a/src/main/java/org/opensearch/security/support/ConfigConstants.java b/src/main/java/org/opensearch/security/support/ConfigConstants.java
index a17678ea..3936e51a 100644
--- a/src/main/java/org/opensearch/security/support/ConfigConstants.java
+++ b/src/main/java/org/opensearch/security/support/ConfigConstants.java
@@ -204,6 +204,10 @@ public class ConfigConstants {
     // Log4j
     public static final String SECURITY_AUDIT_LOG4J_LOGGER_NAME = "log4j.logger_name";
     public static final String SECURITY_AUDIT_LOG4J_LEVEL = "log4j.level";
+
+    // Kafka
+    public static final String SECURITY_AUDIT_KAFKA_TOPIC_NAME = "kafka.topic_name";
+    public static final String SECURITY_AUDIT_KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap_servers";

     // retry
     public static final String SECURITY_AUDIT_RETRY_COUNT = "plugins.security.audit.config.retry_count";
[dev]$ git diff src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java
diff --git a/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java b/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java
index 36ef7709..2c640862 100644
--- a/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java
+++ b/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java
@@ -1718,6 +1718,22 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin
                 )
             );

+            // Kafka
+            settings.add(
+                Setting.simpleString(
+ ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX + ConfigConstants.SECURITY_AUDIT_KAFKA_BOOTSTRAP_SERVERS,
+                    Property.NodeScope,
+                    Property.Filtered
+                )
+            );
+            settings.add(
+                Setting.simpleString(
+ ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT_PREFIX + ConfigConstants.SECURITY_AUDIT_KAFKA_TOPIC_NAME,
+                    Property.NodeScope,
+                    Property.Filtered
+                )
+            );
+
             // Kerberos
settings.add(Setting.simpleString(ConfigConstants.SECURITY_KERBEROS_KRB5_FILEPATH, Property.NodeScope, Property.Filtered));
             settings.add(

Doesn't even work.

Even if you were configuring it with the help of the routers, it would not run. As both AuditMessageRouter and KafkaSink seems to be not working correctly.

plugins.security.audit.routes.AUTHENTICATED: kafka
plugins.security.audit.endpoints.kafka:
  type: kafka
  config:
    bootstrap_servers: "127.0.0.1:19092"
    topic_name: "test_topic"
    client_id: "tester_os"

For one, the KafkaSink is unable to correctly load the Producer due to a classLoader issue, as per this stackoverflow

Secondly, if you were to work around that using the suggestions in the above stackoverflow:

[dev]$ git diff src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java
diff --git a/src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java b/src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java
index e67ed665..7907ec21 100644
--- a/src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java
+++ b/src/main/java/org/opensearch/security/auditlog/sink/KafkaSink.java
@@ -57,8 +57,8 @@ public class KafkaSink extends AuditLogSink {
             }
         }

- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         topicName = sinkSettings.get("topic_name");

         // map path of
@@ -76,6 +76,7 @@ public class KafkaSink extends AuditLogSink {
             this.producer = AccessController.doPrivileged(new PrivilegedExceptionAction<KafkaProducer<Long, String>>() {
                 @Override
                 public KafkaProducer<Long, String> run() throws Exception {
+ Thread.currentThread().setContextClassLoader(null);
                     return new KafkaProducer<Long, String>(producerProps);
                 }
             });

You would run into the same issues described in #4397 (JavaSecurityPolicy errors)

And working around that seems to indicate some issues in AuditMessageRouter:

[2024-06-04T20:08:11,205][TRACE][o.o.s.a.r.AuditMessageRouter] [snip] Setting up routes for endpoint AUTHENTICATED, configuration is kafka
[2024-06-04T20:08:11,205][ERROR][o.o.s.s.DynamicConfigFactory] [snip] Could not dispatch event: class org.opensearch.security.auditlog.config.AuditConfig to subscribing class class org.opensearch.security.auditlog.impl.AuditLogImpl
java.lang.ClassCastException: class java.util.HashMap$Node cannot be cast to class java.util.Map (java.util.HashMap$Node and java.util.Map are in module java.base of loader 'bootstrap')
    at org.opensearch.security.auditlog.routing.AuditMessageRouter.lambda$enableRoutes$2(AuditMessageRouter.java:125) ~[opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
    at java.base/java.util.stream.ReferencePipeline$15$1.accept(ReferencePipeline.java:541) ~[?:?]
    at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) ~[?:?]
    at org.opensearch.security.auditlog.routing.AuditMessageRouter.enableRoutes(AuditMessageRouter.java:155) ~[opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.auditlog.impl.AuditLogImpl.enableRoutes(AuditLogImpl.java:92) ~[opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.auditlog.impl.AbstractAuditLog.onComplianceConfigChanged(AbstractAuditLog.java:138) ~[opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.auditlog.impl.AuditLogImpl.setConfig(AuditLogImpl.java:86) ~[opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:517) [eventbus-java-3.3.1.jar:?]
    at org.greenrobot.eventbus.EventBus.postToSubscription(EventBus.java:440) [eventbus-java-3.3.1.jar:?]
    at org.greenrobot.eventbus.EventBus.postSingleEventForEventType(EventBus.java:421) [eventbus-java-3.3.1.jar:?]
    at org.greenrobot.eventbus.EventBus.postSingleEvent(EventBus.java:394) [eventbus-java-3.3.1.jar:?]
    at org.greenrobot.eventbus.EventBus.post(EventBus.java:275) [eventbus-java-3.3.1.jar:?]
    at org.opensearch.security.securityconf.DynamicConfigFactory.onChange(DynamicConfigFactory.java:314) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.notifyAboutChanges(ConfigurationRepository.java:572) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.notifyConfigurationListeners(ConfigurationRepository.java:561) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.reloadConfiguration0(ConfigurationRepository.java:556) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.loadConfigurationWithLock(ConfigurationRepository.java:540) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.reloadConfiguration(ConfigurationRepository.java:533) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.initalizeClusterConfiguration(ConfigurationRepository.java:286) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at org.opensearch.security.configuration.ConfigurationRepository.lambda$initOnNodeStart$10(ConfigurationRepository.java:441) [opensearch-security-3.0.0.0-SNAPSHOT.jar:3.0.0.0-SNAPSHOT]
    at java.base/java.lang.Thread.run(Thread.java:833) [?:?]

Where AuditMessageRouter.java seems to be sending the entry from the router config expecting it to be a list. But setting it as a list in opensearch.yml returns a settings exception from opensearch-core

Like I have said before, I am not a Java developer, but I really do think this is not a case of needing some help configuring, I think this part of the codebase has been unchanged drastically since floragunns SecurityGuard

This should either be fixed, or the dead code of "KafkaSink" should be removed as to not confuse people on unsupported ways of configuring, or the configuration should be changed to not include "Kafka" as a supported appender (unless specified you need to recompile opensearch-core with the correct libraries needed) or the audit feature should be fixed to support kafka as a first class sink.

Or, if I am indeed wrong, which I really hope I am, it would be very much helpful for the KafkaSink to be documented on how to implement, because even reading through the source code, two people have been unable to do so, hence my apprehension against closing this issue due to "configuration issues".

willyborankin commented 3 weeks ago

It is still a problem. Related problem to it: https://github.com/opensearch-project/security/issues/4397

scrawfor99 commented 2 weeks ago

[Triage] Thanks @willyborankin for taking a look. Since this is a confirmed issue, I will mark as triaged.