odpi / egeria

Egeria core
https://egeria-project.org
Apache License 2.0
806 stars 260 forks source link

Unable to change kafka producer partition class to Round Robin Paritioner #4952

Closed ilovechai closed 3 years ago

ilovechai commented 3 years ago

We are using egeria-2.4 version at the moment, and trying to change the partitioner.class property in eventbus config, but kafka producer is unable to pick that up.

Property key and values tried:

Below is omag log error:

2021-03-25 20:31:03.406 ERROR [          pool-4-thread-1] o.a.e.t.k.KafkaOpenMetadataEventProducer : Bad exception from sending events Invalid value org.apache.kafka.clients.producer.internals.RoundRobinPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.RoundRobinPartitioner could not be found.
Thu Mar 25 20:31:03 GMT 2021 igc_omrs Shutdown OCF-KAFKA-TOPIC-CONNECTOR-0011 The Apache Kafka producer for topic open-metadata.repository-services.cohort.one_catalog.OMRSTopic is shutting down after sending 0 messages and with 1 unsent messages

Omag server config

{
    "class": "OMAGServerConfig",
    "versionId": "V2.0",
    "localServerId": "9186d4b0-6a36-4484-8822-6db4e5347493",
    "localServerName": "igc_omrs",
    "localServerType": "OpenMetadataandGovernanceServer",
    "organizationName": "ibm",
    "localServerURL": "omag:8080",
    "localServerUserId": "OMAGServer",
    "maxPageSize": 1000,
    "eventBusConfig": {
        "class": "EventBusConfig",
        "topicURLRoot": "egeria.omag",
        "configurationProperties": {
            "producer": {
                "bootstrap.servers": "kafka:9093",
                "acks": "all",
                "retries": "0",
                "batch.size": "16384",
                "linger.ms": "1",
                "buffer.memory": "33554432",
                "max.request.size": "10485760",
                "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic",
                "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"
            },
            "consumer": {
                "bootstrap.servers": "kafka:9093",
                "zookeeper.session.timeout.ms": "300000",
                "zookeeper.sync.time.ms": "2000",
                "fetch.message.max.bytes": "10485760",
                "max.partition.fetch.bytes": "10485760",
                "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic"
            }
        },
        "additionalProperties": {
            "producer": {
                "bootstrap.servers": "kafka:9093",
                "acks": "all",
                "retries": "0",
                "batch.size": "16384",
                "linger.ms": "1",
                "buffer.memory": "33554432",
                "max.request.size": "10485760",
                "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic",
                "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"
            },
            "consumer": {
                "bootstrap.servers": "kafka:9093",
                "zookeeper.session.timeout.ms": "300000",
                "zookeeper.sync.time.ms": "2000",
                "fetch.message.max.bytes": "10485760",
                "max.partition.fetch.bytes": "10485760",
                "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic"
            }
        }
    },
    "repositoryServicesConfig": {
        "class": "RepositoryServicesConfig",
        "auditLogConnections": [
            {
                "class": "Connection",
                "headerVersion": 0,
                "displayName": "Console",
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "type": {
                        "class": "ElementType",
                        "headerVersion": 0,
                        "elementOrigin": "LOCAL_COHORT",
                        "elementVersion": 0,
                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                        "elementTypeName": "ConnectorType",
                        "elementTypeVersion": 1,
                        "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                    },
                    "guid": "4afac741-3dcc-4c60-a4ca-a6dede994e3f",
                    "qualifiedName": "ConsoleAuditLogStoreConnector",
                    "displayName": "ConsoleAuditLogStoreConnector",
                    "description": "Connectorsupportsloggingofauditlogmessagestostdout.",
                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.repositoryservices.auditlogstore.console.ConsoleAuditLogStoreProvider"
                },
                "configurationProperties": {
                    "supportedSeverities": [
                        "<Unknown>",
                        "Information",
                        "Event",
                        "Decision",
                        "Action",
                        "Error",
                        "Exception",
                        "Security",
                        "Startup",
                        "Shutdown",
                        "Asset",
                        "Types",
                        "Cohort"
                    ]
                }
            }
        ],
        "localRepositoryConfig": {
            "class": "LocalRepositoryConfig",
            "metadataCollectionId": "d6077e3d-15f4-43ef-80e2-e9760d22cd62",
            "localRepositoryMode": "REPOSITORY_PROXY",
            "localRepositoryLocalConnection": {
                "class": "Connection",
                "headerVersion": 0,
                "url": "https://internal-nginx-svc:12443/ibm/iis/igc/",
                "additionalProperties": {
                    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.producer.topic.id": "igcunifiedgovevents",
                    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.consumer.topic.id": "igc-kg-bridge-out",
                    "group.id": "IGCOMRSConsumerGroup",
                    "iis.rest.password": "V48PqjvYOd",
                    "bootstrap.servers": "kafka:9093",
                    "redis.key.expire": "86400",
                    "zookeeper.session.timeout.ms": "300000",
                    "redis.host": "redis-ha-master-svc:6380",
                    "zookeeper.sync.time.ms": "2000",
                    "iis.rest.address": "https://internal-nginx-svc:12443/",
                    "iis.rest.user": "isadmin"
                },
                "displayName": "IGCConnection",
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "connectorProviderClassName": "com.ibm.iis.openmetadata.adapters.igc.v2.repositoryconnector.IGCV2OMRSRepositoryConnectorProvider"
                }
            },
            "localRepositoryRemoteConnection": {
                "class": "Connection",
                "headerVersion": 0,
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "type": {
                        "class": "ElementType",
                        "headerVersion": 0,
                        "elementOrigin": "LOCAL_COHORT",
                        "elementVersion": 0,
                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                        "elementTypeName": "ConnectorType",
                        "elementTypeVersion": 1,
                        "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                    },
                    "guid": "75ea56d1-656c-43fb-bc0c-9d35c5553b9e",
                    "qualifiedName": "OMRSRESTAPIRepositoryConnector",
                    "displayName": "OMRSRESTAPIRepositoryConnector",
                    "description": "OMRSRepositoryConnectorthatcallstherepositoryservicesRESTAPIofaremoteserver.",
                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.repositoryservices.rest.repositoryconnector.OMRSRESTRepositoryConnectorProvider"
                },
                "endpoint": {
                    "class": "Endpoint",
                    "headerVersion": 0,
                    "address": "omag:8080/servers/igc_omrs"
                }
            },
            "eventsToSaveRule": "ALL",
            "eventsToSendRule": "ALL",
            "eventMapperConnection": {
                "class": "Connection",
                "headerVersion": 0,
                "url": "https://internal-nginx-svc:12443/ibm/iis/igc/",
                "additionalProperties": {
                    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.producer.topic.id": "igcunifiedgovevents",
                    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.consumer.topic.id": "igc-kg-bridge-out",
                    "group.id": "IGCOMRSConsumerGroup",
                    "iis.rest.password": "V48PqjvYOd",
                    "bootstrap.servers": "kafka:9093",
                    "redis.key.expire": "86400",
                    "zookeeper.session.timeout.ms": "300000",
                    "redis.host": "redis-ha-master-svc:6380",
                    "zookeeper.sync.time.ms": "2000",
                    "iis.rest.address": "https://internal-nginx-svc:12443/",
                    "iis.rest.user": "isadmin"
                },
                "displayName": "IGCEventMapper",
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "connectorProviderClassName": "com.ibm.iis.openmetadata.adapters.igc.v2.eventmapper.IGCV2OMRSRepositoryEventMapperProvider"
                }
            }
        },
        "cohortConfigList": [
            {
                "class": "CohortConfig",
                "cohortName": "one_catalog",
                "cohortRegistryConnection": {
                    "class": "Connection",
                    "headerVersion": 0,
                    "connectorType": {
                        "class": "ConnectorType",
                        "headerVersion": 0,
                        "type": {
                            "class": "ElementType",
                            "headerVersion": 0,
                            "elementOrigin": "LOCAL_COHORT",
                            "elementVersion": 0,
                            "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                            "elementTypeName": "ConnectorType",
                            "elementTypeVersion": 1,
                            "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                        },
                        "guid": "108b85fe-d7a8-45c3-9f88-742ac4e4fd14",
                        "qualifiedName": "FileBasedCohortRegistryStoreConnector",
                        "displayName": "FileBasedCohortRegistryStoreConnector",
                        "description": "Connectorsupportsstoringoftheopenmetadatacohortregistryinafile.",
                        "connectorProviderClassName": "org.odpi.openmetadata.adapters.repositoryservices.cohortregistrystore.file.FileBasedRegistryStoreProvider"
                    },
                    "endpoint": {
                        "class": "Endpoint",
                        "headerVersion": 0,
                        "address": "igc_omrs.one_catalog.registrystore"
                    }
                },
                "cohortOMRSTopicConnection": {
                    "class": "VirtualConnection",
                    "headerVersion": 0,
                    "connectorType": {
                        "class": "ConnectorType",
                        "headerVersion": 0,
                        "connectorProviderClassName": "org.odpi.openmetadata.repositoryservices.connectors.omrstopic.OMRSTopicProvider"
                    },
                    "embeddedConnections": [
                        {
                            "class": "EmbeddedConnection",
                            "headerVersion": 0,
                            "position": 0,
                            "displayName": "one_catalogOMRSTopic",
                            "embeddedConnection": {
                                "class": "Connection",
                                "headerVersion": 0,
                                "connectorType": {
                                    "class": "ConnectorType",
                                    "headerVersion": 0,
                                    "type": {
                                        "class": "ElementType",
                                        "headerVersion": 0,
                                        "elementOrigin": "LOCAL_COHORT",
                                        "elementVersion": 0,
                                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                                        "elementTypeName": "ConnectorType",
                                        "elementTypeVersion": 1,
                                        "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                                    },
                                    "guid": "3851e8d0-e343-400c-82cb-3918fed81da6",
                                    "qualifiedName": "KafkaOpenMetadataTopicConnector",
                                    "displayName": "KafkaOpenMetadataTopicConnector",
                                    "description": "KafkaOpenMetadataTopicConnectorsupportsstringbasedeventsoveranApacheKafkaeventbus.",
                                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicProvider",
                                    "recognizedConfigurationProperties": [
                                        "producer",
                                        "consumer",
                                        "local.server.id",
                                        "sleepTime"
                                    ]
                                },
                                "endpoint": {
                                    "class": "Endpoint",
                                    "headerVersion": 0,
                                    "address": "open-metadata.repository-services.cohort.one_catalog.OMRSTopic"
                                },
                                "configurationProperties": {
                                    "producer": {
                                        "bootstrap.servers": "kafka:9093",
                                        "acks": "all",
                                        "retries": "0",
                                        "batch.size": "16384",
                                        "linger.ms": "1",
                                        "buffer.memory": "33554432",
                                        "max.request.size": "10485760",
                                        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                                        "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                                        "kafka.omrs.topic.id": "kafka-omrs-topic",
                                        "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"
                                    },
                                    "local.server.id": "9186d4b0-6a36-4484-8822-6db4e5347493",
                                    "consumer": {
                                        "bootstrap.servers": "kafka:9093",
                                        "zookeeper.session.timeout.ms": "300000",
                                        "zookeeper.sync.time.ms": "2000",
                                        "fetch.message.max.bytes": "10485760",
                                        "max.partition.fetch.bytes": "10485760",
                                        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                                        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                                        "kafka.omrs.topic.id": "kafka-omrs-topic"
                                    }
                                }
                            }
                        }
                    ]
                },
                "cohortOMRSTopicProtocolVersion": "V1",
                "eventsToProcessRule": "ALL"
            }
        ]
    },
    "auditTrail": [
        "FriMar1907:13:13GMT2021adminupdatedconfigurationforlocalserver'sURLroottoomag:8080.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforlocalserver'sowningorganization'snametoibm.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationfordefaulteventbus.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforthelocalrepository.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforthelocalrepository.",
        "FriMar1907:13:14GMT2021adminpreservinglocalmetadatacollectionidd6077e3d-15f4-43ef-80e2-e9760d22cd62.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforcohortone_catalog.",
        "FriMar1907:13:15GMT2021adminupdatedconfigurationforcohortone_catalog.",
        "WedMar2420:42:58GMT2021adminupdatedconfigurationfordefaulteventbus.",
        "WedMar2421:10:34GMT2021adminupdatedconfigurationfordefaulteventbus.",
        "WedMar2422:01:53GMT2021adminupdatedconfigurationfordefaulteventbus."
    ]
}

Does kafka client in egeria-2.4 pick up/use the RoundRobin partitioner.class?

planetf1 commented 3 years ago

@wbittles is this something you could look at?

planetf1 commented 3 years ago

I've been able to pass additional properties to the kafka consumer/producer, for example for authentication/ssl related configuration as per https://github.com/planetf1/egeria/blob/master/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/README.md without any changes. I've not checked this for a few release but do not believe anything has changed

ilovechai commented 3 years ago

@planetf1 we are able to pass additional properties as well, but it's this property that's picked up but cannot find the Round Robin Partitioner class in kafka client package.

wbittles commented 3 years ago

It's not obvious how I can help with this , is the kafka version being used 2.4+ ?

mandy-chessell commented 3 years ago

The error message is coming from Kafka - Egeria is just logging it (badly - raised issue #5028). This means that Kafka is picking up the properties - it just can not use it.

Are these classes on the classpath?

mandy-chessell commented 3 years ago

It seems org.apache.kafka.clients.producer.RoundRobinPartitioner was added in Kafka 2.4

https://issues.apache.org/jira/browse/KAFKA-3333

ilovechai commented 3 years ago

@mandy-chessell egeria 2.4 uses kafka-client:2.6.0 https://github.com/odpi/egeria/blob/egeria-release-2.4/build.gradle#L122. We also use kafka-client:2.3.1, not sure if that's overriding the transitive dependecy that egeria has. I'll upgrade kafka-client, test the changes, and get back.

ilovechai commented 3 years ago

okay looks like upgrading to kafka-client:2.7 picks up the round robin partitioner class.

Wed Apr 07 23:59:12 GMT 2021 igc_omrs Startup OMAG-ADMIN-0004 The igc_omrs server has successfully completed start up.  The following services are running: [Open Metadata Repository Services (OMRS)]
2021-04-07 23:59:12.458  INFO [          pool-4-thread-1] o.a.k.c.p.ProducerConfig                 : ProducerConfig values:
    acks = -1
    batch.size = 16384
    bootstrap.servers = [kafka:9093]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
..
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.RoundRobinPartitioner <-- 

Before offset:

[root###]# kubectl exec -it kafka-0 -- /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic open-metadata.repository-services.cohort.one_catalog.OMRSTopic
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:0:7649
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:1:9307
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:2:0

After offset:

[root###]# kubectl exec -it kafka-0 -- /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic open-metadata.repository-services.cohort.one_catalog.OMRSTopic
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:0:7700  <--- 
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:1:9358  <---
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:2:52    <---

Property that worked:

"partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitioner"
mandy-chessell commented 3 years ago

Looks like we can close this?