Netflix / conductor

Conductor is a microservices orchestration engine.
Apache License 2.0
12.82k stars 2.34k forks source link

Scaling up Conductor #160

Closed clari-akhilesh closed 7 years ago

clari-akhilesh commented 7 years ago

In the documentation, you mentioned that scaling out conductor servers would achieve scalability and improve performance. In my test cluster, every node has elasticsearch, dynomite and conductor server running. When scaling up from 1 to 2 nodes, the performance dropped (measured by, time taken to run 'x' number of parallel workflows). Is this the right way to scale conductor? If not, could you provide the best practices? Thanks.

v1r3n commented 7 years ago

@clari-akhilesh what does your dynomite setup look like? When you deploy conductor do you deploy with multiple availability zones or same az (EC2_AVAILABILTY_ZONE in the property file)?

clari-akhilesh commented 7 years ago

@v1r3n There was a mistake in the Dynomite deployment which was causing the performance degradation. In my test deployment, when adding nodes to the conductor cluster, each node has its own instance of elasticsearch, dynomite and conductor server. elasticsearch and dynomite are part of the entire cluster i.e they talk to other elasticsearch and dynomite running on other instances on the cluster. Conductor server communicates to elasticsearch and dynomite on the local node. All the conductor servers are behind a load balancer. Here's an example configuration for the conductor server: db=dynomite EC2_AVAILABILITY_ZONE=us-east-1d queues.dynomite.nonQuorum.port=65534 workflow.dynomite.cluster.name=conductor-dynomite workflow.dynomite.cluster.hosts=localhost:65532:us-east-1d workflow.namespace.prefix=conductor workflow.namespace.queue.prefix=conductor_queues workflow.elasticsearch.url=localhost:9300 workflow.elasticsearch.index.name=conductor

Is this a good way of deploying conductor servers? Also when deploying dynomite with multiple AZ, what should be the value of EC2_AVAILABILTY_ZONE?

v1r3n commented 7 years ago

In my test deployment, when adding nodes to the conductor cluster, each node has its own instance of elasticsearch, dynomite and conductor server. elasticsearch and dynomite are part of the entire cluster i.e they talk to other elasticsearch and dynomite running on other instances on the cluster. Conductor server communicates to elasticsearch and dynomite on the local node. All the conductor servers are behind a load balancer. Here's an example configuration for the conductor server:

Conductor nodes should share the elasticsearch and dynomite cluster and should be deployed on separate instances (machines, containers, VMs).

workflow.dynomite.cluster.hosts will then point to the comma separate list of dynomite hosts and workflow.elasticsearch.url will point to one of the elasticsearch hostname (or load balancer).

EC2_AVAILABILTY_ZONE should be same as the AZ/rack name you provide for the instance. This should match up to one of the racks in dynomite.

Hope this helps.

clari-akhilesh commented 7 years ago

In my example workflow I've 124 tasks and a single run of the workflow takes about 11 seconds. Is there a guide to the number of dynomite, elasticsearch and conducter servers that would be needed to run a 1000 parallel workflows in under a minute?

clari-akhilesh commented 7 years ago

I separated out dynomite, elasticsearch and conductor and here's my config for conductor servers:

#server-c1 region us-east-1c

port=65530
db=dynomite
environment=test
EC2_REGION=us-east-1
EC2_AVAILABILITY_ZONE=us-east-1c
queues.dynomite.nonQuorum.port=65534
workflow.dynomite.cluster.name=conductor-dynomite
workflow.dynomite.cluster.hosts=10.128.5.78:65532:us-east-1c;10.128.5.112:65532:us-east-1d;10.128.5.135:65532:us-east-1e
workflow.namespace.prefix=conductor
workflow.namespace.queue.prefix=conductor_queues
workflow.elasticsearch.url=10.128.5.91:9300
workflow.elasticsearch.index.name=conductor
#server-d1 region us-east-1d

port=65530
db=dynomite
environment=test
EC2_REGION=us-east-1
EC2_AVAILABILITY_ZONE=us-east-1d
queues.dynomite.nonQuorum.port=65534
workflow.dynomite.cluster.name=conductor-dynomite
workflow.dynomite.cluster.hosts=10.128.5.78:65532:us-east-1c;10.128.5.112:65532:us-east-1d;10.128.5.135:65532:us-east-1e
workflow.namespace.prefix=conductor
workflow.namespace.queue.prefix=conductor_queues
workflow.elasticsearch.url=10.128.5.118:9300
workflow.elasticsearch.index.name=conductor
#server-e1 region us-east-1e

port=65530
db=dynomite
environment=test
EC2_REGION=us-east-1
EC2_AVAILABILITY_ZONE=us-east-1e
queues.dynomite.nonQuorum.port=65534
workflow.dynomite.cluster.name=conductor-dynomite
workflow.dynomite.cluster.hosts=10.128.5.78:65532:us-east-1c;10.128.5.112:65532:us-east-1d;10.128.5.135:65532:us-east-1e
workflow.namespace.prefix=conductor
workflow.namespace.queue.prefix=conductor_queues
workflow.elasticsearch.url=10.128.5.143:9300
workflow.elasticsearch.index.name=conductor

Dynomite configuration is as follows:

#dynomite-c1

dyn_o_mite:
  datacenter: us-east-1
  dyn_listen: 0.0.0.0:65533
  dyn_port: 65533
  dyn_seed_provider: simple_provider
  dyn_seeds:
  - ec2-34-205-75-59.compute-1.amazonaws.com:65533:us-east-1d:us-east-1:4294967295
  - ec2-54-85-9-73.compute-1.amazonaws.com:65533:us-east-1e:us-east-1:4294967295
  listen: 0.0.0.0:65532
  max_msgs: 200000
  mbuf_size: 65536
  rack: us-east-1c
  secure_server_option: all
  servers:
  - 127.0.0.1:65534:1
  stats_listen: 0.0.0.0:65531
  timeout: 150000
  tokens: 4294967295
#dynomite-d1

dyn_o_mite:
  datacenter: us-east-1
  dyn_listen: 0.0.0.0:65533
  dyn_port: 65533
  dyn_seed_provider: simple_provider
  dyn_seeds:
  - ec2-52-55-26-49.compute-1.amazonaws.com:65533:us-east-1c:us-east-1:4294967295
  - ec2-54-85-9-73.compute-1.amazonaws.com:65533:us-east-1e:us-east-1:4294967295
  listen: 0.0.0.0:65532
  max_msgs: 200000
  mbuf_size: 65536
  rack: us-east-1d
  secure_server_option: all
  servers:
  - 127.0.0.1:65534:1
  stats_listen: 0.0.0.0:65531
  timeout: 150000
  tokens: 4294967295
#dynomite-e1

dyn_o_mite:
  datacenter: us-east-1
  dyn_listen: 0.0.0.0:65533
  dyn_port: 65533
  dyn_seed_provider: simple_provider
  dyn_seeds:
  - ec2-52-55-26-49.compute-1.amazonaws.com:65533:us-east-1c:us-east-1:4294967295
  - ec2-34-205-75-59.compute-1.amazonaws.com:65533:us-east-1d:us-east-1:4294967295
  listen: 0.0.0.0:65532
  max_msgs: 200000
  mbuf_size: 65536
  rack: us-east-1e
  secure_server_option: all
  servers:
  - 127.0.0.1:65534:1
  stats_listen: 0.0.0.0:65531
  timeout: 150000
  tokens: 4294967295

On Conductor Server nodes, server-d1 and server-e1, I see a lot of these exceptions:

198993 [qtp1031775150-47] ERROR com.netflix.conductor.server.resources.GenericExceptionMapper  - NoAvailableHostsException: [host=Host [hostname=UNKNOWN, ipAddress=UNKNOWN, port=0, rack: UNKNOWN, datacenter: UNKNOW, status: Down], latency=0(0), attempts=0]Token not found for key hash: 1437569046
com.netflix.dyno.connectionpool.exception.NoAvailableHostsException: NoAvailableHostsException: [host=Host [hostname=UNKNOWN, ipAddress=UNKNOWN, port=0, rack: UNKNOWN, datacenter: UNKNOW, status: Down], latency=0(0), attempts=0]Token not found for key hash: 1437569046
    at com.netflix.dyno.connectionpool.impl.hash.BinarySearchTokenMapper.getToken(BinarySearchTokenMapper.java:68)
    at com.netflix.dyno.connectionpool.impl.lb.TokenAwareSelection.getTokenForKey(TokenAwareSelection.java:110)
    at com.netflix.dyno.connectionpool.impl.lb.TokenAwareSelection.getPoolForOperation(TokenAwareSelection.java:73)
    at com.netflix.dyno.connectionpool.impl.lb.HostSelectionWithFallback.getFallbackHostPool(HostSelectionWithFallback.java:209)
    at com.netflix.dyno.connectionpool.impl.lb.HostSelectionWithFallback.getConnection(HostSelectionWithFallback.java:153)
    at com.netflix.dyno.connectionpool.impl.lb.HostSelectionWithFallback.getConnectionUsingRetryPolicy(HostSelectionWithFallback.java:120)
    at com.netflix.dyno.connectionpool.impl.ConnectionPoolImpl.executeWithFailover(ConnectionPoolImpl.java:292)
    at com.netflix.dyno.jedis.DynoJedisClient.d_get(DynoJedisClient.java:340)
    at com.netflix.dyno.jedis.DynoJedisClient.get(DynoJedisClient.java:334)
    at com.netflix.conductor.dao.dynomite.DynoProxy.get(DynoProxy.java:89)
    at com.netflix.conductor.dao.dynomite.RedisExecutionDAO.getTasks(RedisExecutionDAO.java:203)
    at com.netflix.conductor.dao.dynomite.RedisExecutionDAO.getTasksForWorkflow(RedisExecutionDAO.java:217)
    at com.netflix.conductor.dao.dynomite.RedisExecutionDAO.getWorkflow(RedisExecutionDAO.java:283)
    at com.netflix.conductor.dao.dynomite.RedisExecutionDAO.getWorkflow(RedisExecutionDAO.java:269)
    at com.netflix.conductor.core.execution.WorkflowExecutor.updateTask(WorkflowExecutor.java:396)
    at com.netflix.conductor.service.ExecutionService.updateTask(ExecutionService.java:134)
    at com.netflix.conductor.server.resources.TaskResource.updateTask(TaskResource.java:116)
    at sun.reflect.GeneratedMethodAccessor89.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
    at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185)
    at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
    at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
    at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
    at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
    at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
    at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
    at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
    at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
    at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
    at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409)
    at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558)
    at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:286)
    at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:276)
    at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:181)
    at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:91)
    at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:85)
    at com.netflix.conductor.server.JerseyModule$1.doFilter(JerseyModule.java:99)
    at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:82)
    at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120)
    at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:135)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1174)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1106)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.eclipse.jetty.server.Server.handle(Server.java:524)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:319)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
    at java.lang.Thread.run(Thread.java:745)
v1r3n commented 7 years ago

Adding Dymomite subject expert on the errors. @ipapapa @shailesh33

ipapapa commented 7 years ago

Are you seeing the Dyno exception constantly or sometimes? Sounds to me that the topology is not properly reported on the client side and Dyno cannot find the proper node to hash the key.

A couple of other details for Dynomite's YAML. You do not need 65536 for the mbuf_size. You can use something smaller like 16384. You should also remove the dyn_port: 65533. dyn_listen would do the job. Make sure before you start Conductor that you can send data with redis-cli.

amoolya19 commented 7 years ago

@clari-akhilesh / @v1r3n ,

im new cluster things. so could you please explain on how did you do it? i am running conductor using jar. do i need any other clustering tools to make the conductor cluster or they have cluster mode themselves? please let me know if there are any other reference docs.

smkb80 commented 7 years ago

@v1r3n , How is load balancing among Conductor servers achieved? Is a load balancer must? Or the Conductor Servers within a cluster (talking to the same underlying Dynomite) capable of load balancing by themselves as the deployment follows peer-to-peer architecture?

djk29a commented 7 years ago

Conductor itself is stateless and so n nodes can be put behind a load balancer and each Conductor instance treated the same as another functionally. With horizontal scaling and availability of Conductor solved, this leaves the backend. Each Conductor node is configured in its configuration file to talk to m Dynomite nodes and internally Dynomite will potentially forward requests to other Dynomite nodes (total of p Dynomite nodes, where p >= m) that contains the data desired. Dynomite will also perform replication and redistribution of data transparent to clients. So, Dynomite can be functionally treated similar to a multi-master database with multiple writers somewhat similar to Redis 3 (or Memcache) cluster equivalents.

For ElasticSearch configuration for Conductor, the architecture is different and looks more like a traditional single endpoint service.

For a deeper dive on Dynomite and how it works with Redis / Memcached, please see https://medium.com/netflix-techblog/introducing-dynomite-making-non-distributed-databases-distributed-c7bce3d89404

smkb80 commented 7 years ago

Thanks @djk29a for clarifying the load balancing. Couple of quick questions:

Somewhere I read that Conductor Servers follow peer-to-peer deployment architecture. So, what kind of information is exchanged among the Conductor Servers in a cluster? What is the communication mechanism or protocol?

v1r3n commented 7 years ago

Conductor servers do not communicate to each other - as there is no persistent state maintained on the servers. They are essentially stateless machines. The state of the workflows is maintained in the dynomite database and is read each time a workflow execution is evaluated.

Hope this clarifies.

smkb80 commented 7 years ago

Thanks @v1r3n for the clarification.