apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.27k stars 3.59k forks source link

Setting topic permission under load #2952

Open HaraldGustafsson opened 6 years ago

HaraldGustafsson commented 6 years ago

Expected behavior

Do a REST call to admin interface for setting topic permissions like:

https://localhost:8081/admin/v2/persistent/tt1/testns1/t4%5Cdata/permissions/testrole

body:

{'testrole': ['consume', 'produce']}

Should work every time and return status 204.

Actual behavior

When this is done with heavy load setting up permission for many topics in parallel. Some will result in the below logged exception and status code 500.

Steps to reproduce

Set up tenant and namespace, then run the set permission for many different topics in that namespace in parallel. I use the same role and actions for all of them.

System configuration

pulsar: 2.2.0

This error was when running the broker in standalone mode in a docker container supplied on docker hub, with authorisation and authentication via client certs. Have not tested on our full pulsar setup yet.

Logs

12:27:31.613 [pulsar-web-53-28] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [superUser] Successfully granted access for role testrole: [produce, consume] - topic persistent://tt1/testns1/t5\meta
12:27:31.587 [pulsar-web-53-30] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [superUser] Failed to grant permissions for topic persistent://tt1/testns1/t0\data
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/policies/tt1/testns1
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) ~[org.apache.pulsar-pulsar-zookeeper-2.2.0.jar:2.2.0]
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[org.apache.pulsar-pulsar-zookeeper-2.2.0.jar:2.2.0]
    at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1336) ~[org.apache.pulsar-pulsar-zookeeper-2.2.0.jar:2.2.0]
    at org.apache.bookkeeper.zookeeper.ZooKeeperClient.access$3101(ZooKeeperClient.java:70) ~[org.apache.bookkeeper-bookkeeper-server-4.7.2.jar:4.7.2]
    at org.apache.bookkeeper.zookeeper.ZooKeeperClient$21.call(ZooKeeperClient.java:1065) ~[org.apache.bookkeeper-bookkeeper-server-4.7.2.jar:4.7.2]
    at org.apache.bookkeeper.zookeeper.ZooKeeperClient$21.call(ZooKeeperClient.java:1059) ~[org.apache.bookkeeper-bookkeeper-server-4.7.2.jar:4.7.2]
    at org.apache.bookkeeper.zookeeper.ZooWorker.syncCallWithRetries(ZooWorker.java:140) ~[org.apache.bookkeeper-bookkeeper-server-4.7.2.jar:4.7.2]
    at org.apache.bookkeeper.zookeeper.ZooKeeperClient.setData(ZooKeeperClient.java:1059) ~[org.apache.bookkeeper-bookkeeper-server-4.7.2.jar:4.7.2]
    at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGrantPermissionsOnTopic(PersistentTopicsBase.java:274) [org.apache.pulsar-pulsar-broker-2.2.0.jar:2.2.0]
    at org.apache.pulsar.broker.admin.v2.PersistentTopics.grantPermissionsOnTopic(PersistentTopics.java:107) [org.apache.pulsar-pulsar-broker-2.2.0.jar:2.2.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271) [org.glassfish.jersey.core-jersey-common-2.25.jar:?]
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267) [org.glassfish.jersey.core-jersey-common-2.25.jar:?]
    at org.glassfish.jersey.internal.Errors.process(Errors.java:315) [org.glassfish.jersey.core-jersey-common-2.25.jar:?]
    at org.glassfish.jersey.internal.Errors.process(Errors.java:297) [org.glassfish.jersey.core-jersey-common-2.25.jar:?]
    at org.glassfish.jersey.internal.Errors.process(Errors.java:267) [org.glassfish.jersey.core-jersey-common-2.25.jar:?]
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317) [org.glassfish.jersey.core-jersey-common-2.25.jar:?]
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154) [org.glassfish.jersey.core-jersey-server-2.25.jar:?]
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473) [org.glassfish.jersey.containers-jersey-container-servlet-core-2.25.jar:?]
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427) [org.glassfish.jersey.containers-jersey-container-servlet-core-2.25.jar:?]
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) [org.glassfish.jersey.containers-jersey-container-servlet-core-2.25.jar:?]
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) [org.glassfish.jersey.containers-jersey-container-servlet-core-2.25.jar:?]
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) [org.glassfish.jersey.containers-jersey-container-servlet-core-2.25.jar:?]
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:845) [org.eclipse.jetty-jetty-servlet-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689) [org.eclipse.jetty-jetty-servlet-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:53) [org.apache.pulsar-pulsar-broker-2.2.0.jar:2.2.0]
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676) [org.eclipse.jetty-jetty-servlet-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.apache.pulsar.broker.web.AuthenticationFilter.doFilter(AuthenticationFilter.java:74) [org.apache.pulsar-pulsar-broker-common-2.2.0.jar:2.2.0]
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676) [org.eclipse.jetty-jetty-servlet-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581) [org.eclipse.jetty-jetty-servlet-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:224) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) [org.eclipse.jetty-jetty-servlet-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:119) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.Server.handle(Server.java:524) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:319) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:253) [org.eclipse.jetty-jetty-server-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) [org.eclipse.jetty-jetty-io-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) [org.eclipse.jetty-jetty-io-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:202) [org.eclipse.jetty-jetty-io-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) [org.eclipse.jetty-jetty-io-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) [org.eclipse.jetty-jetty-io-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) [org.eclipse.jetty-jetty-io-9.3.11.v20160721.jar:9.3.11.v20160721]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.22.Final.jar:4.1.22.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
12:27:31.669 [pulsar-web-53-27] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.1 - - [07/Nov/2018:12:27:31 +0000] "POST https://localhost:8081/admin/v2/persistent/tt1/testns1/t6%5Cdata/permissions/testrole HTTP/1.1" 500 6238 "-" "Python/3.7 aiohttp/3.4.2" 385
12:27:31.669 [pulsar-web-53-31] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.1 - - [07/Nov/2018:12:27:31 +0000] "POST https://localhost:8081/admin/v2/persistent/tt1/testns1/t7%5Cmeta/permissions/testrole HTTP/1.1" 500 5652 "-" "Python/3.7 aiohttp/3.4.2" 402
12:27:31.668 [pulsar-web-53-29] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.1 - - [07/Nov/2018:12:27:31 +0000] "POST https://localhost:8081/admin/v2/persistent/tt1/testns1/t8%5Cdata/permissions/testrole HTTP/1.1" 500 6238 "-" "Python/3.7 aiohttp/3.4.2" 371
12:27:31.643 [pulsar-web-53-28] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.1 - - [07/Nov/2018:12:27:31 +0000] "POST https://localhost:8081/admin/v2/persistent/tt1/testns1/t5%5Cmeta/permissions/testrole HTTP/1.1" 204 0 "-" "Python/3.7 aiohttp/3.4.2" 173
12:27:31.642 [pulsar-web-53-24] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.1 - - [07/Nov/2018:12:27:31 +0000] "POST https://localhost:8081/admin/v2/persistent/tt1/testns1/t4%5Cdata/permissions/testrole HTTP/1.1" 500 6238 "-" "Python/3.7 aiohttp/3.4.2" 341
HaraldGustafsson commented 6 years ago

Now verified same issue on a full Pulsar setup.

sijie commented 6 years ago

I think the problem might be related to concurrent updates on namespace policy. we can probably handle the bad version and retry, or put all the updates on same namespace in ordered executor.

HaraldGustafsson commented 6 years ago

Yes, the zookeeper bad version implies that pulsar have concurrent updates. At least a retry is needed. Don't know if a queue would help, do you only have one entry point? Otherwise, you would still have potential concurrent updates. Harald

HaraldGustafsson commented 5 years ago

Hi, Just a comment is it not normal to give status service unavailable 503, for a retry scenario. Status conflict implies that the client has sent a faulty request, which is not the case, and sometimes it could be hard to distinguish this conflict from an actual conflict, and a need to still parse response text. Or is this zookeeper conflict handled inside pulsar with a retry. Harald

On Fri, 28 Dec 2018, 19:15 Sijie Guo <notifications@github.com wrote:

Closed #2952 https://github.com/apache/pulsar/issues/2952 via 1fa81ca https://github.com/apache/pulsar/commit/1fa81ca54f688938c83a0be3c98bebc7589be668 .

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/apache/pulsar/issues/2952#event-2047561931, or mute the thread https://github.com/notifications/unsubscribe-auth/ADKqXSU-BC9QxLxFu9KkjppUfPNwcDxzks5u9l_dgaJpZM4YSXhe .

sijie commented 5 years ago

@HaraldGustafsson At the time of bad version happening, it means there are concurrent updates to namespace policy. It is hard to determine what is the real conflict and what conflict it is. so returning CONFLICT status code sounds a right approach to me. IMO, Pulsar applications which are using pulsar admin api should be able to catch this exception and retries, because pulsar applications are better places to resolve the conflict. Does that align with what you are thinking?

HaraldGustafsson commented 5 years ago

I understand the reasoning, but from the api it is not obvious that when you change permissions on different topics in the same namespace that you could get this concurrent conflict (and should retry), if you don't know that all topics permissions are stored in the namespace. At least I did not know it before digging a bit deeper. When realising this I needed to queue such operations on the client side and add retry since multiple workers might attempt to do changes. So when setting up many topics, with several configurations like retention, ttl, permissions, etc, they all need to be queued, and get rtt delay. In my setup, for each admin request we get we need to queue around 10 admin requests to pulsar, with retries even more. Would it be possible to introduce a larger namespace policy update method to change multiple things simultaneously, since in my case the conflict was often due to changing several things in the ns policy.

On Fri, 28 Dec 2018, 19:49 Sijie Guo <notifications@github.com wrote:

@HaraldGustafsson https://github.com/HaraldGustafsson At the time of bad version happening, it means there are concurrent updates to namespace policy. It is hard to determine what is the real conflict and what conflict it is. so returning CONFLICT status code sounds a right approach to me. IMO, Pulsar applications which are using pulsar admin api should be able to catch this exception and retries, because pulsar applications are better places to resolve the conflict. Does that align with what you are thinking?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/apache/pulsar/issues/2952#issuecomment-450408555, or mute the thread https://github.com/notifications/unsubscribe-auth/ADKqXf-Nb3u2i7X88mY23dJtaugbUCD_ks5u9metgaJpZM4YSXhe .

sijie commented 5 years ago

@HaraldGustafsson I see your point now. Let me reopen this issue and think through a better solution on addressing such conflicts.

foreversunyao commented 5 years ago

@HaraldGustafsson for my point of view, retry could not a good solution, due to retry also could fail again. Maybe a queue/or only one request at one time is a better idea, what I know for kafka-manager to change policy does like this .

@sijie what do you think about this ?

sijie commented 5 years ago

@foreversunyao -

Maybe a queue/or only one request at one time is a better idea, what I know for kafka-manager to change policy does like this .

I was thinking about same thing. that sgtm

foreversunyao commented 5 years ago

@sijie I'll try to add one global write lock for this request first, I think it could be enough for change policy.

sijie commented 5 years ago

@foreversunyao sgtm

foreversunyao commented 5 years ago

@sijie for this issue, I am considering create a persistent zknode in zk if not exists, zknode name is the namespace name . by this way to implement a lock for change permission. But I have a problem which path I should use, what do you think about this ?

current path: [zookeeper, counters, ledgers, managed-ledgers, schemas, namespace, admin, loadbalance]

foreversunyao commented 5 years ago

@HaraldGustafsson @sijie @merlimat , Hi, I made a pr https://github.com/apache/pulsar/pull/3398/ for this issue , tried to add a zknode for mutex(to be honest, it's only a short way like queue size is one, could not a best way).

Right now, what I think we could have 2 other ways to do retry thing except this one:

any ideas ? thanks

jaak-pruulmann-sympower commented 2 years ago

Any news here? I have topics where there are ~10 different roles with different permissions. I manage them with pulsar terraform provider which basically does 10 POSTs in a row. In a new cluster with no load it was almost certain that already the second POST would fail. Also, the terraform provider uses internally pulsarctl which does not have any retry mechanism, although it could, at least while the internals of the server are not fixed?

ckreiling commented 1 year ago

I am in the exact same situation as @jaak-pruulmann-sympower - have you found any workaround? I just terraform apply repeatedly until all the changes go through, which under many circumstances takes a long time.

I'm interested in contributing a fix to this since it's a pretty frequent pain-point for me; I like the idea of handling the BadVersionException by retrying with a back-off. For consistency-oriented systems like ZooKeeper I'm not sure there's another workaround besides removing the point of contention entirely. Maybe a lock would work too but that's effectively the same thing as retrying on BadVersionExceptions (i.e. you can still hit a timeout acquiring the lock). This'd be my first contribution so any guidance would be appreciated

edit: just noticed a zk sequential node was mentioned; this still needs a timeout since the caller needs to wait for the queue item to be processed and if the queue gets stuck then the caller will need to timeout. Still a fan of just retrying on BadVersionException to handle majority of cases where maybe 10s or hundreds of updates occur concurrently