streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
452 stars 137 forks source link

[BUG] Cannot write msg to topic by kafka console tool #1437

Open magicsilence opened 2 years ago

magicsilence commented 2 years ago

Env

Describe the bug

I got a warn msg when using kafka-console-producer.sh write data to pulsar. ./bin/kafka-console-producer.sh --broker-list A.A.A.A:9092,B.B.B.B:9092,C.C.C.:9092 --topic test_4

[2022-08-05 07:22:56,509] WARN [Producer clientId=console-producer] Got error produce response with correlation id 5 on topic-partition public/default/t_4-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-05 07:22:56,510] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition public/default/t_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

At the same time, pulsar-broker.log got some logs, like

2022-08-05T07:44:42,241+0000 [pulsar-web-36-7] INFO org.eclipse.jetty.server.RequestLog - a.a.a.a - - [05/Aug/2022:07:44:42 +0000] "GET /admin/v2/persistent/public/default/t_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 4 2022-08-05T07:44:42,333+0000 [pulsar-ph-kafka-59-3] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger public/default/persistent/t_4-partition-0 2022-08-05T07:44:42,335+0000 [bookkeeper-ml-scheduler-OrderedScheduler-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/t_4-partition-0] Closing managed ledger 2022-08-05T07:44:42,335+0000 [bookkeeper-ml-scheduler-OrderedScheduler-5-0] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/t_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default

And Pulsar broker config:

metadataStoreUrl=A.A.A.A:2181,B.B.B.B:2181,C.C.C.C:2181/pulsar_2_10_1 configurationMetadataStoreUrl=A.A.A.A:2181,B.B.B.B:2181,C.C.C.C:2181/pulsar_2_10_1 brokerServicePort=6650 brokerServicePortTls= webServicePort=8080 webServicePortTls= webServiceTlsProtocols= webServiceTlsCiphers= bindAddress=0.0.0.0 bindAddresses= advertisedAddress=A.A.A.A haProxyProtocolEnabled=false numAcceptorThreads= numIOThreads= numOrderedExecutorThreads=8 numHttpServerThreads= numExecutorThreadPoolSize= numCacheExecutorThreadPoolSize=10 enableBusyWait=false maxConcurrentHttpRequests=1024 isRunningStandalone= clusterName=pulsar-cluster-1 maxTenants=0 failureDomainsEnabled=false metadataStoreSessionTimeoutMillis=30000 metadataStoreOperationTimeoutSeconds=30 metadataStoreCacheExpirySeconds=300 brokerShutdownTimeoutMs=60000 skipBrokerShutdownOnOOM=false backlogQuotaCheckEnabled=true backlogQuotaCheckIntervalInSeconds=60 backlogQuotaDefaultLimitBytes=-1 backlogQuotaDefaultLimitSecond=-1 backlogQuotaDefaultRetentionPolicy=producer_request_hold ttlDurationDefaultInSeconds=0 allowAutoTopicCreation=true allowAutoTopicCreationType=non-partitioned allowAutoSubscriptionCreation=true defaultNumPartitions=1 brokerDeleteInactiveTopicsEnabled=false brokerDeleteInactiveTopicsFrequencySeconds=60 brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions brokerDeleteInactivePartitionedTopicMetadataEnabled=false brokerDeleteInactiveTopicsMaxInactiveDurationSeconds= forceDeleteTenantAllowed=false forceDeleteNamespaceAllowed=false maxPendingPublishRequestsPerConnection=1000 messageExpiryCheckIntervalInMinutes=5 activeConsumerFailoverDelayTimeMillis=1000 subscriptionExpirationTimeMinutes=0 subscriptionRedeliveryTrackerEnabled=true subscriptionExpiryCheckIntervalInMinutes=5 subscriptionTypesEnabled=Exclusive,Shared,Failover,Key_Shared subscriptionKeySharedUseConsistentHashing=true subscriptionKeySharedConsistentHashingReplicaPoints=100 brokerDeduplicationEnabled=false brokerDeduplicationMaxNumberOfProducers=10000 brokerDeduplicationSnapshotFrequencyInSeconds=120 brokerDeduplicationSnapshotIntervalSeconds=120 brokerDeduplicationEntriesInterval=1000 brokerDeduplicationProducerInactivityTimeoutMinutes=360 defaultNumberOfNamespaceBundles=4 maxNamespacesPerTenant=0 maxTopicsPerNamespace=0 brokerMaxConnections=0 brokerMaxConnectionsPerIp=0 isAllowAutoUpdateSchemaEnabled=true clientLibraryVersionCheckEnabled=false statusFilePath= preferLaterVersions=false maxUnackedMessagesPerConsumer=50000 maxUnackedMessagesPerSubscription=200000 maxUnackedMessagesPerBroker=0 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 unblockStuckSubscriptionEnabled=false topicPublisherThrottlingTickTimeMillis=10 preciseTopicPublishRateLimiterEnable=false brokerPublisherThrottlingTickTimeMillis=50 brokerPublisherThrottlingMaxMessageRate=0 brokerPublisherThrottlingMaxByteRate=0 maxPublishRatePerTopicInMessages=0 maxPublishRatePerTopicInBytes=0 subscribeThrottlingRatePerConsumer=0 subscribeRatePeriodPerConsumerInSecond=30 dispatchThrottlingRateInMsg=0 dispatchThrottlingRateInByte=0 dispatchThrottlingRatePerTopicInMsg=0 dispatchThrottlingRatePerTopicInByte=0 dispatchThrottlingOnBatchMessageEnabled=false dispatchThrottlingRatePerSubscriptionInMsg=0 dispatchThrottlingRatePerSubscriptionInByte=0 dispatchThrottlingRatePerReplicatorInMsg=0 dispatchThrottlingRatePerReplicatorInByte=0 dispatchThrottlingRateRelativeToPublishRate=false dispatchThrottlingOnNonBacklogConsumerEnabled=true dispatcherMaxReadBatchSize=100 dispatcherMaxReadSizeBytes=5242880 dispatcherMinReadBatchSize=1 dispatcherMaxRoundRobinBatchSize=20 dispatcherReadFailureBackoffInitialTimeInMs=15000 dispatcherReadFailureBackoffMaxTimeInMs=60000 dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 preciseDispatcherFlowControl=false entryFilterNames= entryFiltersDirectory= maxConcurrentLookupRequest=50000 maxConcurrentTopicLoadRequest=5000 maxConcurrentNonPersistentMessagePerConnection=1000 numWorkerThreadsForNonPersistentTopic= enablePersistentTopics=true enableNonPersistentTopics=true enableRunBookieTogether=false enableRunBookieAutoRecoveryTogether=false maxProducersPerTopic=0 maxSameAddressProducersPerTopic=0 encryptionRequireOnProducer=false maxConsumersPerTopic=0 maxSameAddressConsumersPerTopic=0 maxSubscriptionsPerTopic=0 maxConsumersPerSubscription=0 maxMessageSize=5242880 brokerServiceCompactionMonitorIntervalInSeconds=60 brokerServiceCompactionThresholdInBytes=0 brokerServiceCompactionPhaseOneLoopTimeInSeconds=30 delayedDeliveryEnabled=true delayedDeliveryTickTimeMillis=1000 acknowledgmentAtBatchIndexLevelEnabled=false enableReplicatedSubscriptions=true replicatedSubscriptionsSnapshotFrequencyMillis=1000 replicatedSubscriptionsSnapshotTimeoutSeconds=30 replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10 maxMessagePublishBufferSizeInMB= retentionCheckIntervalInSeconds=120 maxMessageSizeCheckIntervalInSeconds=60 maxNumPartitionsPerPartitionedTopic=0 zookeeperSessionExpiredPolicy=reconnect systemTopicEnabled=false systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE topicLevelPoliciesEnabled=false topicFencingTimeoutSeconds=0 proxyRoles= authenticateOriginalAuthData=false tlsCertRefreshCheckDurationSec=300 tlsCertificateFilePath= tlsKeyFilePath= tlsTrustCertsFilePath= tlsAllowInsecureConnection=false tlsProtocols= tlsCiphers= tlsRequireTrustedClientCertOnConnect=false tlsProvider= webServiceTlsProvider=Conscrypt tlsEnabledWithKeyStore=false tlsKeyStoreType=JKS tlsKeyStore= tlsKeyStorePassword= tlsTrustStoreType=JKS tlsTrustStore= tlsTrustStorePassword= brokerClientTlsEnabledWithKeyStore=false brokerClientSslProvider= brokerClientTlsTrustStoreType=JKS brokerClientTlsTrustStore= brokerClientTlsTrustStorePassword= brokerClientTlsCiphers= brokerClientTlsProtocols= metadataStoreBatchingEnabled=true metadataStoreBatchingMaxDelayMillis=5 metadataStoreBatchingMaxOperations=1000 metadataStoreBatchingMaxSizeKb=128 authenticationEnabled=false authenticationProviders= authenticationRefreshCheckSeconds=60 authorizationEnabled=false authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider authorizationAllowWildcardsMatching=false superUserRoles= brokerClientTlsEnabled=false brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= brokerClientTrustCertsFilePath= athenzDomainNames= anonymousUserRole= tokenSecretKey= tokenPublicKey= tokenAuthClaim= tokenAudienceClaim= tokenAudience= saslJaasClientAllowedIds= saslJaasServerSectionName= httpMaxRequestSize=-1 disableHttpDebugMethods=false httpRequestsLimitEnabled=false httpRequestsMaxPerSecond=100.0 bookkeeperMetadataServiceUri=zk+null://A.A.A.A:2181,B.B.B.B:2181,C.C.C.C:2181/pulsar_2_10_1/ledgers bookkeeperClientAuthenticationPlugin= bookkeeperClientAuthenticationParametersName= bookkeeperClientAuthenticationParameters= bookkeeperClientTimeoutInSeconds=30 bookkeeperClientNumWorkerThreads= bookkeeperClientSpeculativeReadTimeoutInMillis=0 bookkeeperNumberOfChannelsPerBookie=16 bookkeeperUseV2WireProtocol=true bookkeeperClientHealthCheckEnabled=true bookkeeperClientHealthCheckIntervalSeconds=60 bookkeeperClientHealthCheckErrorThresholdPerInterval=5 bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 bookkeeperClientQuarantineRatio=1.0 bookkeeperClientGetBookieInfoIntervalSeconds=86400 bookkeeperClientGetBookieInfoRetryIntervalSeconds=60 bookkeeperClientRackawarePolicyEnabled=true bookkeeperClientRegionawarePolicyEnabled=false bookkeeperClientMinNumRacksPerWriteQuorum=2 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false bookkeeperClientReorderReadSequenceEnabled=false bookkeeperClientIsolationGroups= bookkeeperClientSecondaryIsolationGroups= bookkeeperClientMinAvailableBookiesInIsolationGroups= bookkeeperEnableStickyReads=true bookkeeperTLSProviderFactoryClass=org.apache.bookkeeper.tls.TLSContextFactory bookkeeperTLSClientAuthentication=false bookkeeperTLSKeyFileType=PEM bookkeeperTLSTrustCertTypes=PEM bookkeeperTLSKeyStorePasswordPath= bookkeeperTLSTrustStorePasswordPath= bookkeeperTLSKeyFilePath= bookkeeperTLSCertificateFilePath= bookkeeperTLSTrustCertsFilePath= bookkeeperTlsCertFilesRefreshDurationSeconds=300 bookkeeperDiskWeightBasedPlacementEnabled=false bookkeeperExplicitLacIntervalInMills=0 managedLedgerDefaultEnsembleSize=2 managedLedgerDefaultWriteQuorum=2 managedLedgerDefaultAckQuorum=2 managedLedgerCursorPositionFlushSeconds=60 managedLedgerStatsPeriodSeconds=60 managedLedgerDigestType=CRC32C managedLedgerNumWorkerThreads=8 managedLedgerNumSchedulerThreads=8 managedLedgerCacheSizeMB= managedLedgerCacheCopyEntries=false managedLedgerCacheEvictionWatermark=0.9 managedLedgerCacheEvictionFrequency=100.0 managedLedgerCacheEvictionTimeThresholdMillis=1000 managedLedgerCursorBackloggedThreshold=1000 managedLedgerDefaultMarkDeleteRateLimit=1.0 managedLedgerMaxEntriesPerLedger=50000 managedLedgerMinLedgerRolloverTimeMinutes=10 managedLedgerMaxLedgerRolloverTimeMinutes=240 managedLedgerInactiveLedgerRolloverTimeSeconds=0 managedLedgerMaxSizePerLedgerMbytes=2048 managedLedgerOffloadDeletionLagMs=14400000 managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1 managedLedgerCursorMaxEntriesPerLedger=50000 managedLedgerCursorRolloverTimeInSeconds=14400 managedLedgerMaxUnackedRangesToPersist=10000 managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 autoSkipNonRecoverableData=false lazyCursorRecovery=false managedLedgerMetadataOperationsTimeoutSeconds=60 managedLedgerReadEntryTimeoutSeconds=0 managedLedgerAddEntryTimeoutSeconds=0 managedLedgerPrometheusStatsLatencyRolloverSeconds=60 managedLedgerTraceTaskExecution=true managedLedgerNewEntriesCheckDelayInMillis=10 loadBalancerEnabled=true loadBalancerReportUpdateThresholdPercentage=10 loadBalancerReportUpdateMaxIntervalMinutes=15 loadBalancerHostUsageCheckIntervalMinutes=1 loadBalancerSheddingEnabled=true loadBalancerSheddingIntervalMinutes=1 loadBalancerSheddingGracePeriodMinutes=30 loadBalancerBrokerMaxTopics=50000 loadBalancerBrokerOverloadedThresholdPercentage=85 loadBalancerResourceQuotaUpdateIntervalMinutes=15 loadBalancerAutoBundleSplitEnabled=true loadBalancerAutoUnloadSplitBundlesEnabled=true loadBalancerNamespaceBundleMaxTopics=1000 loadBalancerNamespaceBundleMaxSessions=1000 loadBalancerNamespaceBundleMaxMsgRate=30000 loadBalancerNamespaceBundleMaxBandwidthMbytes=100 loadBalancerNamespaceMaximumBundles=128 loadBalancerOverrideBrokerNicSpeedGbps=10 loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide defaultNamespaceBundleSplitAlgorithm=range_equally_divide loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder loadBalancerBrokerThresholdShedderPercentage=10 loadBalancerMsgRateDifferenceShedderThreshold=50 loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold=4 loadBalancerHistoryResourcePercentage=0.9 loadBalancerBandwithInResourceWeight=1.0 loadBalancerBandwithOutResourceWeight=1.0 loadBalancerCPUResourceWeight=1.0 loadBalancerMemoryResourceWeight=1.0 loadBalancerDirectMemoryResourceWeight=1.0 loadBalancerBundleUnloadMinThroughputThreshold=10 namespaceBundleUnloadingTimeoutMs=60000 replicationMetricsEnabled=true replicationConnectionsPerBroker=16 replicationProducerQueueSize=1000 replicatorPrefix=pulsar.repl replicationPolicyCheckDurationSeconds=600 defaultRetentionTimeInMinutes=0 defaultRetentionSizeInMB=0 keepAliveIntervalSeconds=30 bootstrapNamespaces= webSocketServiceEnabled=false webSocketNumIoThreads=8 webSocketConnectionsPerBroker=8 webSocketSessionIdleTimeoutMillis=300000 webSocketMaxTextFrameSize=1048576 exposeTopicLevelMetricsInPrometheus=true exposeConsumerLevelMetricsInPrometheus=false exposeProducerLevelMetricsInPrometheus=false exposeManagedLedgerMetricsInPrometheus=true exposeManagedCursorMetricsInPrometheus=false metricsServletTimeoutMs=30000 exposeBundlesMetricsInPrometheus=false functionsWorkerEnabled=true exposePublisherStats=true statsUpdateFrequencyInSecs=60 statsUpdateInitialDelayInSecs=60 exposePreciseBacklogInPrometheus=false splitTopicAndPartitionLabelInPrometheus=false aggregatePublisherStatsByProducerName=false schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory isSchemaValidationEnforced=false schemaCompatibilityStrategy=FULL offloadersDirectory=./offloaders managedLedgerOffloadDriver= managedLedgerOffloadMaxThreads=2 managedLedgerOffloadPrefetchRounds=1 managedLedgerUnackedRangesOpenCacheSetEnabled=true s3ManagedLedgerOffloadRegion= s3ManagedLedgerOffloadBucket= s3ManagedLedgerOffloadServiceEndpoint= s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 gcsManagedLedgerOffloadRegion= gcsManagedLedgerOffloadBucket= gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 gcsManagedLedgerOffloadServiceAccountKeyFile= managedLedgerOffloadBucket= fileSystemProfilePath=../conf/filesystem_offload_core_site.xml fileSystemURI= transactionCoordinatorEnabled=false transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider transactionBufferSnapshotMaxTransactionCount=1000 transactionBufferSnapshotMinTimeInMillis=5000 transactionBufferClientMaxConcurrentRequests=1000 transactionPendingAckLogIndexMinLag=500 enablePackagesManagement=false packagesManagementStorageProvider=org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider packagesReplicas=1 packagesManagementLedgerRootPath=/ledgers zookeeperServers= globalZookeeperServers= configurationStoreServers= zooKeeperSessionTimeoutMillis=-1 zooKeeperOperationTimeoutSeconds=-1 zooKeeperCacheExpirySeconds=-1 replicationTlsEnabled=false brokerServicePurgeInactiveFrequencyInSeconds=60 backlogQuotaDefaultLimitGB=-1 tlsEnabled=false subscriptionKeySharedEnable=true messagingProtocols=kafka protocolHandlerDirectory=./protocols allowAutoTopicCreationType=partitioned kafkaListeners=PLAINTEXT://0.0.0.0:9092 kafkaAdvertisedListeners=PLAINTEXT://A.A.A.A:9092 brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

Additional context The pulsar client can produce/consume normally in this cluster.

duola@aws-ir1-bigdata-pulsar-test-prod-024142 apache-pulsar-2.10.1]$ bin/pulsar-client produce \

persistent://public/default/test \ -n 1 \ -m "Hello Fordeal" 2022-08-05T09:52:14,995+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x6d022c99, L:/127.0.0.1:50316 - R:localhost/127.0.0.1:6650]] Connected to server 2022-08-05T09:52:15,222+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/test","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":1000,"maxPendingMessagesAcrossPartitions":50000,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null} 2022-08-05T09:52:15,278+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null} 2022-08-05T09:52:15,296+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x799ea290, L:/172.30.24.142:58764 - R:172.30.23.153/172.30.23.153:6650]] Connected to server 2022-08-05T09:52:15,301+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/test-partition-0] [null] Creating producer on cnx [id: 0x799ea290, L:/172.30.24.142:58764 - R:172.30.23.153/172.30.23.153:6650] 2022-08-05T09:52:15,368+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/test-partition-0] [pulsar-cluster-1-35-0] Created producer on cnx [id: 0x799ea290, L:/172.30.24.142:58764 - R:172.30.23.153/172.30.23.153:6650] 2022-08-05T09:52:15,384+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.PartitionedProducerImpl - [persistent://public/default/test] Created partitioned producer 2022-08-05T09:52:15,413+0000 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized 2022-08-05T09:52:15,484+0000 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/ 2022-08-05T09:52:15,492+0000 [main] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [persistent://public/default/test-partition-0] [pulsar-cluster-1-35-0] Pending messages: 0 --- Publish throughput: 4.80 msg/s --- 0.00 Mbit/s --- Latency: med: 96.000 ms - 95pct: 96.000 ms - 99pct: 96.000 ms - 99.9pct: 96.000 ms - max: 96.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 13.000 bytes - 95pct: 13.000 bytes - 99pct: 13.000 bytes - 99.9pct: 13.000 bytes - max: 13.000 bytes --- Ack received rate: 4.80 ack/s --- Failed messages: 0 --- Pending messages: 0 2022-08-05T09:52:15,499+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/test-partition-0] [pulsar-cluster-1-35-0] Closed Producer 2022-08-05T09:52:15,525+0000 [main] INFO org.apache.pulsar.client.impl.PartitionedProducerImpl - [persistent://public/default/test] Closed Partitioned Producer 2022-08-05T09:52:15,532+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x6d022c99, L:/127.0.0.1:50316 ! R:localhost/127.0.0.1:6650] Disconnected 2022-08-05T09:52:15,536+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x799ea290, L:/172.30.24.142:58764 ! R:172.30.23.153/172.30.23.153:6650] Disconnected 2022-08-05T09:52:17,550+0000 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced

[duola@aws-ir1-bigdata-pulsar-test-prod-024142 apache-pulsar-2.10.1]$ bin/pulsar-client consume \

persistent://public/default/test \ -n 100 \ -s "consumer-test" \ -t "Exclusive"

2022-08-05T09:51:51,552+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x3c5ca9be, L:/127.0.0.1:50312 - R:localhost/127.0.0.1:6650]] Connected to server 2022-08-05T09:51:51,742+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"consumer-test","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"45064","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10} 2022-08-05T09:51:51,767+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null} 2022-08-05T09:51:51,836+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x7c1a2d4d, L:/172.30.24.142:58760 - R:172.30.23.153/172.30.23.153:6650]] Connected to server 2022-08-05T09:51:51,860+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-partition-0][consumer-test] Subscribing to topic on cnx [id: 0x7c1a2d4d, L:/172.30.24.142:58760 - R:172.30.23.153/172.30.23.153:6650], consumerId 0 2022-08-05T09:51:52,437+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-partition-0][consumer-test] Subscribed to topic on 172.30.23.153/172.30.23.153:6650 -- consumer: 0 2022-08-05T09:51:52,440+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [persistent://public/default/test] [consumer-test] Success subscribe new topic persistent://public/default/test in topics consumer, partitions: 1, allTopicPartitionsNumber: 1 2022-08-05T09:52:15,540+0000 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized ----- got message ----- key:[null], properties:[], content:Hello Fordeal 2022-08-05T09:52:51,770+0000 [pulsar-timer-5-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/test-partition-0] [consumer-test] [45064] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

Demogorgon314 commented 2 years ago

Can you provide the complete broker log? And I have one question. Is the NOT_LEADER_FOR_PARTITION always happening and can't be recoverable?

magicsilence commented 2 years ago

hi @Demogorgon314 .
I tired restarting all brokers and it stiil not work , the promblem is always happening. I changed kop config entryFormat from kafka to mixed_kafka for testing , will this action cause the problem?

producer : [duola@aws-ir1-bigdata-kafka-binlog-prod-020027 kafka_2.12-2.0.0]$ ./bin/kafka-console-producer.sh --broker-list 172.30.23.153:9092,172.30.24.142:9092,172.30.30.148:9092 --topic test_4

1[2022-08-08 07:18:15,987] WARN [Producer clientId=console-producer] Got error produce response with correlation id 5 on topic-partition test_4-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:15,987] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

[2022-08-08 07:18:16,089] WARN [Producer clientId=console-producer] Got error produce response with correlation id 7 on topic-partition test_4-0, retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:16,089] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:16,194] WARN [Producer clientId=console-producer] Got error produce response with correlation id 9 on topic-partition test_4-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:16,194] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:16,300] ERROR Error when sending message to topic test_4 with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. [2022-08-08 07:18:16,303] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,080] WARN [Producer clientId=console-producer] Got error produce response with correlation id 13 on topic-partition test_4-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,080] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,185] WARN [Producer clientId=console-producer] Got error produce response with correlation id 15 on topic-partition test_4-0, retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,185] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,289] WARN [Producer clientId=console-producer] Got error produce response with correlation id 17 on topic-partition test_4-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,289] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) [2022-08-08 07:18:17,392] ERROR Error when sending message to topic test_4 with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. [2022-08-08 07:18:17,392] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition test_4-0 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

broker logs:

2022-08-08T07:18:14,725+0000 [pulsar-io-4-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /172.30.30.148:42692 2022-08-08T07:18:14,748+0000 [pulsar-web-36-8] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:14 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 3350 "-" "Pulsar-Java-v2.10.1" 5 2022-08-08T07:18:14,776+0000 [pulsar-web-36-8] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:14 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 3350 "-" "Pulsar-Java-v2.10.1" 6 2022-08-08T07:18:14,809+0000 [pulsar-2-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - 3 brokers being considered for assignment of public/default/0xe0000000_0xf0000000 2022-08-08T07:18:15,976+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:15,977+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,009+0000 [pulsar-web-36-8] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:16 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 5 2022-08-08T07:18:16,020+0000 [pulsar-client-io-41-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x01eba0ea, L:/172.30.23.153:34728 - R:172.30.23.153/172.30.23.153:6650]] Connected to server 2022-08-08T07:18:16,022+0000 [pulsar-io-4-2] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /172.30.23.153:34728 2022-08-08T07:18:16,087+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,087+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,136+0000 [pulsar-web-36-1] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:16 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 3 2022-08-08T07:18:16,189+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,190+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,248+0000 [pulsar-web-36-5] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:16 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 4 2022-08-08T07:18:16,297+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,297+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:16,359+0000 [pulsar-web-36-6] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:16 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 3 2022-08-08T07:18:17,078+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,079+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,090+0000 [pulsar-web-36-6] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:17 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 6 2022-08-08T07:18:17,184+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,184+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,206+0000 [pulsar-web-36-4] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:17 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 6 2022-08-08T07:18:17,286+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,287+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,337+0000 [pulsar-web-36-7] INFO org.eclipse.jetty.server.RequestLog - 172.30.23.153 - - [08/Aug/2022:07:18:17 +0000] "GET /admin/v2/persistent/public/default/test_4/partitions HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.10.1" 7 2022-08-08T07:18:17,391+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4-partition-0) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default 2022-08-08T07:18:17,391+0000 [pulsar-ph-kafka-59-2] WARN org.apache.pulsar.broker.service.BrokerService - Namespace bundle for topic (persistent://public/default/test_4) not served by this instance. Please redo the lookup. Request is denied: namespace=public/default

Demogorgon314 commented 2 years ago

Can you provide each broker's configuration? Or you can check the kafkaAdvertisedListeners and kafkaListeners settings in each broker, here is an example setting:

broker-01

kafkaListeners=PLAINTEXT://pulsar-broker-01-ip:9092
kafkaAdvertisedListeners=PLAINTEXT://pulsar-broker-01-ip:9092

broker-02

kafkaListeners=PLAINTEXT://pulsar-broker-02-ip:9092
kafkaAdvertisedListeners=PLAINTEXT://pulsar-broker-02-ip:9092
magicsilence commented 2 years ago

@Demogorgon314

broker-01(172.30.23.153)

kafkaListeners=PLAINTEXT://0.0.0.0:9092

kafkaAdvertisedListeners=PLAINTEXT://172.30.23.153:9092

broker-02(172.30.24.142)

kafkaListeners=PLAINTEXT://0.0.0.0:9092
kafkaAdvertisedListeners=PLAINTEXT://172.30.24.142:9092

broker-03(172.30.30.148)

kafkaListeners=PLAINTEXT://0.0.0.0:9092
kafkaAdvertisedListeners=PLAINTEXT://172.30.30.148:9092
Demogorgon314 commented 2 years ago

@magicsilence The configuration look's good to me. Can you provide full broker logs? The debug level log is better.

oguzhantortop commented 2 years ago

hi, I also face the same problem. I deleted the topic using pulsar CLI after that I wanted to create same topic by just connecting producer, and sending message to the same topic via kafka CLI. This might help you in reproducing the problem. My question is if it is possible to find a work around for using the same topic again?

Demogorgon314 commented 2 years ago

@oguzhantortop Thanks for providing the reproducing step! I'll try it later.

oguzhantortop commented 2 years ago

@oguzhantortop Thanks for providing the reproducing step! I'll try it later.

Sorry I figured out that when I use KOP it creates a partitioned topic. And from pulsar-admin CLI I deleted topic using topics delete instead of delete-partitioned-topic command. Maybe delete shouldn't delete partitioned topic or maybe it is a design choice it let's you to delete a single partition of topic. Anyway after deleting topic with delete-partitioned-topic command I was able to produce from KAFKA CLI. But this still let's you to reproduce the problem.