apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.24k stars 3.59k forks source link

[Functions] Unable to create/update functions and Admin operations often fail #10293

Open devinbost opened 3 years ago

devinbost commented 3 years ago

I created a build based on master (2.8.0-SNAPSHOT), and I'm getting a number of issues.

When I try to update, start, stop a function, get function status, or get function stats, I get this exception (or something similar) 90% of the time:

2021-04-21T01:43:37,293 [pulsar-web-46-14] ERROR org.apache.pulsar.functions.worker.rest.api.ComponentImpl - Update Failed org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.asynchttpclient.exception.RemotelyClosedException: Remotely closed at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:229) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.client.admin.internal.FunctionsImpl.lambda$updateOnWorkerLeaderAsync$13(FunctionsImpl.java:1078) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_282] at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.handleUnexpectedClosedChannel(NettyRequestSender.java:484) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelInactive(AsyncHttpClientHandler.java:145) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.stream.ChunkedWriteHandler.channelInactive(ChunkedWriteHandler.java:138) ~[io.netty-netty-handler-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235) ~[io.netty-netty-codec-http-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:311) ~[io.netty-netty-codec-http-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] Caused by: java.util.concurrent.CompletionException: org.asynchttpclient.exception.RemotelyClosedException: Remotely closed at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_282] ... 38 more Caused by: org.asynchttpclient.exception.RemotelyClosedException: Remotely closed at org.asynchttpclient.exception.RemotelyClosedException.INSTANCE(Unknown Source) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] 500 Internal Server Error HTTP/1.1

I suspect this is related to #10291

Steps to reproduce:

Create a build server to build Pulsar from apache/master via this commands:

# Install docker and git
sudo apt-get update
sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg \
    lsb-release -y
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo \
  "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \
  $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io -y
# download my Pulsar fork
git clone https://github.com/devinbost/pulsar.git

# download and install act
curl https://raw.githubusercontent.com/nektos/act/master/install.sh | sudo bash
# checkout my Pulsar branch
cd pulsar
git checkout disable-function-batching

# Fix docker permissions
sudo groupadd docker
sudo usermod -aG docker ${USER}
sudo chmod 666 /var/run/docker.sock
sudo chmod g+rwx "$HOME/.docker" -R
sudo chown "$USER":"$USER" /home/"$USER"/.docker -R
# May need to run: sudo systemctl restart docker

# Verify these commands can be run:
docker pull gcr.io/google-samples/hello-app:1.0
docker run hello-world

# Install Java JDK 8
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

# Setup maven
wget https://archive.apache.org/dist/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz
sudo tar xf ./apache-maven-*.tar.gz -C /opt
sudo ln -s /opt/apache-maven-3.6.1/bin/mvn /usr/local/bin/mvn

# install more dependencies
sudo apt-get install unzip zip

# Build
mvn install -DskipTests
cd docker
mvn package -Pdocker -DskipTests

# Tag and push image to GCP Container Registry (or an equivalent container registry):
docker tag apachepulsar/pulsar-all gcr.io/$PROJECT_ID/$NAME

docker push gcr.io/$PROJECT_ID/$NAME

Spin up a Pulsar cluster using this broker config:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Original content brought from version 2.3.0 release

### --- General broker settings --- ###

# Zookeeper quorum connection string
zookeeperServers=fab08.xyz.com:2181,fab09.xyz.com:2181,fab10.xyz.com:2181

# Configuration Store connection string
configurationStoreServers=fab08.xyz.com:2181,fab09.xyz.com:2181,fab10.xyz.com:2181
# Broker data port
brokerServicePort=6650

# Port to use to server HTTP request
webServicePort=8080

# Broker data port for TLS - By default TLS is disabled
brokerServicePortTls=6651

# Port to use to server HTTPS request - By default TLS is disabled
webServicePortTls=8443
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

# Number of threads to use for HTTP requests processing. Default is set to Runtime.getRuntime().availableProcessors()
numHttpServerThreads=

# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

# Name of the cluster to which this broker belongs to
clusterName=pulsar-pcdc1-green-test

# Enable cluster's failure-domain which can distribute brokers into logical region
failureDomainsEnabled=false

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

# How often to check for topics that have reached the quota
backlogQuotaCheckIntervalInSeconds=60

# Default per-topic backlog quota limit
backlogQuotaDefaultLimitGB=10

# Default backlog quota retention policy. Default is producer_request_hold
# 'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
# 'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer
# 'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
backlogQuotaDefaultRetentionPolicy=producer_request_hold
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0

# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
subscriptionRedeliveryTrackerEnabled=true

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A larger interval will lead to fewer snapshots being taken, though it would
# increase the topic recovery time when the entries published after the
# snapshot need to be replayed.
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=64

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Path for the file used to determine the rotation status for the broker when responding
# to service discovery health checks
statusFilePath=

# If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to
# use only brokers running the latest software version (to minimize impact to bundles)
preferLaterVersions=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=0

# Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to
# all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and
# unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit
# check and dispatcher can dispatch messages without any restriction
maxUnackedMessagesPerSubscription=0

# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
# messages to all shared subscription which has higher number of unack messages until subscriptions start
# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
# unackedMessage-limit check and broker doesn't block dispatchers
maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
# hence causing high network bandwidth usage
# When the positive value is set, broker will throttle the subscribe requests for one consumer.
# Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled.
subscribeThrottlingRatePerConsumer=0

# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
subscribeRatePeriodPerConsumerInSecond=30

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Default number of message dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message dispatch-throttling.
dispatchThrottlingRatePerSubscriptionInMsg=0

# Default number of message-bytes dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message-byte dispatch-throttling.
# Devin Bost: I think the line below was a typo in a config... But, I'm leaving it in case it's required.
dispatchThrottlingRatePerSubscribeInByte=0

# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have
# backlog.
dispatchThrottlingOnNonBacklogConsumerEnabled=true
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

# Enable to run bookie autorecovery along with broker
enableRunBookieAutoRecoveryTogether=false

# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
# until the number of connected producers decrease.
# Using a value of 0, is disabling maxProducersPerTopic-limit check.
maxProducersPerTopic=0

# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0
# Interval between checks to see if topics with compaction policies need to be compacted
brokerServiceCompactionMonitorIntervalInSeconds=60
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
proxyRoles=

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Deprecated - Use webServicePortTls and brokerServicePortTls instead
#
#   * We have the same logic above for the way of enabling TLS through port assignments
#
tlsEnabled=true
# Path for the TLS certificate file
tlsCertificateFilePath=/pulsar/conf/auth/broker.cert.pem

# Path for the TLS private key file
tlsKeyFilePath=/pulsar/conf/auth/broker.key-pk8.pem

# Path for the trusted TLS certificate file.
# This cert is used to verify that any certs presented by connecting clients
# are signed by a certificate authority. If this verification
# fails, then the certs are untrusted and the connections are dropped.
tlsTrustCertsFilePath=/pulsar/conf/auth/ca.cert.pem

# Accept untrusted TLS certificate from client.
# If true, a client with a cert which cannot be verified with the
# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
# though the cert will not be used for client authentication.
tlsAllowInsecureConnection=false

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.2, TLSv1.1, TLSv1]
tlsProtocols=TLSv1.2,TLSv1.1,TLSv1

# Specify the tls cipher the broker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
tlsCiphers=ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,ECDHE-PSK-CHACHA20-POLY1305,AES256-GCM-SHA384,AES128-GCM-SHA256,ECDHE-PSK-AES256-CBC-SHA,AES256-SHA,PSK-AES256-CBC-SHA,ECDHE-PSK-AES128-CBC-SHA,AES128-SHA,PSK-AES128-CBC-SHA

# Trusted client certificates are required for to connect TLS
# Reject the Connection if the Client Certificate is not trusted.
# In effect, this requires that all connecting clients perform TLS client
# authentication.
tlsRequireTrustedClientCertOnConnect=false

### --- Authentication --- ###

# Enable authentication
authenticationEnabled=true

# Autentication provider name list, which is comma separated list of class names
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
# Enforce authorization
authorizationEnabled=true

# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider

# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=admin,superuser

# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
brokerClientTlsEnabled=false
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGc...xyx
brokerClientTrustCertsFilePath=/pulsar/conf/auth/ca.cert.pem

# brokerClientTlsEnabled=false
# brokerClientAuthenticationPlugin=
# brokerClientAuthenticationParameters=
# brokerClientTrustCertsFilePath=

# Supported Athenz provider domain names(comma separated) for authentication
athenzDomainNames=

# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=anonymous

### --- Token Authentication Provider --- ###

## Symmetric key
# Configure the secret key to be used to validate auth tokens
# The key can be specified like:
# tokenSecretKey=data:base64,xxxxxxxxx
# tokenSecretKey=file:///my/secret.key
# tokenSecretKey=

## Asymmetric public/private key pair
# Configure the public key to be used to validate auth tokens
# The key can be specified like:
# tokenPublicKey=data:base64,xxxxxxxxx
# tokenPublicKey=file:///my/public.key
tokenPublicKey=file:///pulsar/conf/auth/public.key
### --- BookKeeper Client --- ###

# Authentication plugin to use when connecting to bookies
bookkeeperClientAuthenticationPlugin=

# BookKeeper auth plugin implementatation specifics parameters name and values
bookkeeperClientAuthenticationParametersName=
bookkeeperClientAuthenticationParameters=

# Timeout for BK add / read operations
bookkeeperClientTimeoutInSeconds=30

# Speculative reads are initiated if a read request doesn't complete within a certain time
# Using a value of 0, is disabling the speculative reads
bookkeeperClientSpeculativeReadTimeoutInMillis=0

# Use older Bookkeeper wire protocol with bookie
bookkeeperUseV2WireProtocol=true

# Enable bookies health check. Bookies that have more than the configured number of failure within
# the interval will be quarantined for some time. During this period, new ledgers won't be created
# on these bookies
bookkeeperClientHealthCheckEnabled=true
bookkeeperClientHealthCheckIntervalSeconds=60
bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800

# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
# forming a new bookie ensemble
bookkeeperClientRackawarePolicyEnabled=true

# Enable region-aware bookie selection policy. BK will chose bookies from
# different regions and racks when forming a new bookie ensemble
# If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored
bookkeeperClientRegionawarePolicyEnabled=false

# Enable/disable reordering read sequence on reading entries.
bookkeeperClientReorderReadSequenceEnabled=false

# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=
# Enable/disable having read operations for a ledger to be sticky to a single bookie.
# If this flag is enabled, the client will use one single bookie (by preference) to read
# all entries for a ledger.
bookkeeperEnableStickyReads=true

### --- Managed Ledger --- ###

# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=2

# Number of copies to store for each message
managedLedgerDefaultWriteQuorum=2

# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2

# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C

# Number of threads to be used for managed ledger tasks dispatching
managedLedgerNumWorkerThreads=8

# Number of threads to be used for managed ledger scheduled tasks
managedLedgerNumSchedulerThreads=8

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running  in the same broker. By default, uses 1/5th of available direct memory
managedLedgerCacheSizeMB=
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9
# Rate limit the amount of writes per second generated by consumer acking the messages
managedLedgerDefaultMarkDeleteRateLimit=1.0

# Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions
#  * Either the max rollover time has been reached
#  * or max entries have been written to the ledged and at least min-time
#    has passed   (50000)
managedLedgerMaxEntriesPerLedger=10

managedLedgerMinLedgerRolloverTimeMinutes=0

# Maximum time before forcing a ledger rollover for a topic
managedLedgerMaxLedgerRolloverTimeMinutes=240

# Delay between a ledger being successfully offloaded to long term storage
# and the ledger being deleted from bookkeeper (default is 4 hours)
managedLedgerOffloadDeletionLagMs=14400000
# Max number of entries to append to a cursor ledger
managedLedgerCursorMaxEntriesPerLedger=50000

# Max time before triggering a rollover on a cursor ledger
managedLedgerCursorRolloverTimeInSeconds=14400

# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60
# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=120

# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=120

### --- Load balancer --- ###

# Enable load balancer
loadBalancerEnabled=true

# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10

# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15

# Frequency of report to collect
loadBalancerHostUsageCheckIntervalMinutes=1

# Enable/disable automatic bundle unloading for load-shedding
loadBalancerSheddingEnabled=true

# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=1

# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30

# Usage threshold to allocate max number of topics to broker
loadBalancerBrokerMaxTopics=50000

# Usage threshold to determine a broker as over-loaded
loadBalancerBrokerOverloadedThresholdPercentage=85

# Interval to flush dynamic resource quota to ZooKeeper
loadBalancerResourceQuotaUpdateIntervalMinutes=15

# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=true

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=true

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxSessions=1000

# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxMsgRate=30000

# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxBandwidthMbytes=100

# maximum number of bundles in a namespace
loadBalancerNamespaceMaximumBundles=128

# Override the auto-detection of the network interfaces max speed.
# This option is useful in some environments (eg: EC2 VMs) where the max speed
# reported by Linux is not reflecting the real bandwidth available to the broker.
# Since the network usage is employed by the load manager to decide when a broker
# is overloaded, it is important to make sure the info is correct or override it
# with the right value here. The configured value can be a double (eg: 0.8) and that
# can be used to trigger load-shedding even before hitting on NIC limits.
loadBalancerOverrideBrokerNicSpeedGbps=

# Name of load manager to use
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl
### --- Replication --- ###

# Enable replication metrics
replicationMetricsEnabled=true

# Max number of connections to open for each broker in a remote cluster
# More connections host-to-host lead to better throughput over high-latency
# links.
replicationConnectionsPerBroker=16

# Replicator producer queue size
replicationProducerQueueSize=1000

# Replicator prefix used for replicator producer name and cursor name
replicatorPrefix=pulsar.repl

# Default message retention time
defaultRetentionTimeInMinutes=0

# Default retention size
defaultRetentionSizeInMB=0

# How often to check whether the connections are still alive
keepAliveIntervalSeconds=30

# bootstrap namespaces
bootstrapNamespaces=

### --- WebSocket --- ###

# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000

### --- Metrics --- ###

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

# Enable consumer level metrics. default is false
exposeConsumerLevelMetricsInPrometheus=false

# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
# jvmGCMetricsLoggerClassName=

### --- Functions --- ###

# Enable Functions Worker Service in Broker
functionsWorkerEnabled=true

### --- Broker Web Stats --- ###

# Enable topic level metrics
exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60
### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory

# Enforce schema validation on following cases:
#
# - if a producer without a schema attempts to produce to a topic with schema, the producer will be
#   failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.
#   if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false

### --- Ledger Offloading --- ###

# The directory for all the offloader implementations
offloadersDirectory=./offloaders

# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage)
# When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for
# the project (check from Developers Console -> Api&auth -> APIs).
### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
globalZookeeperServers=

# Deprecated - Enable TLS when talking with other clusters to replicate messages
replicationTlsEnabled=false

# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds for Pulsar 2.5.2+
brokerServicePurgeInactiveFrequencyInSeconds=60
advertisedAddress=

#Added as per request for debugging freezing topics
acknowledgmentAtBatchIndexLevelEnabled=true
unblockStuckSubscriptionEnabled=true

Start the brokers using this script:

#!/bin/bash

. /usr/local/bin/docker-functions
export SERVICE_USER=pulsar

pre_start () {
  echo "Broker pre-start"
  app="pulsar-broker"
  docker-standalone ps -aq --filter "name=pulsar-broker" | grep -q . && docker-standalone stop $app && docker-standalone rm -fv $app
  echo "stoped and removed"
}

ETC=/etc/pulsar
LOG=/var/log/pulsar/
DATA=/data/broker
APP_HOME=/

export DOCKER_RUN=" -d \
--memory-swappiness=0 \
--stop-timeout=300 \
--restart=unless-stopped \
--net=host \
-v $LOG:/pulsar/logs \
-v $DATA:/pulsar/data \
-v $ETC:/pulsar/conf:ro \
--user 0 \
gcr.io/$PROJECT_ID/$NAME:$TAG /pulsar/bin/pulsar broker "

run "$@"

It can be started via: pulsar-broker start

Then, connect to one of the brokers and issue: bin/pulsar-admin functions [anything...]

That should cause the issue.

devinbost commented 3 years ago

I'm also getting bookie timeouts (in the broker logs), like: 2021-04-21T02:25:14,625 [BookKeeperClientWorker-OrderedExecutor-24-0] WARN org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (1211784, 9): Bookie operation timeout

devinbost commented 3 years ago

Note: The slow performance was resolved after switching from debug log level to info log level, but the Admin operations are still failing.

codelipenghui commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.