elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
68 stars 3.5k forks source link

[META] Centralized State / Shared Work #8682

Open andrewvc opened 6 years ago

andrewvc commented 6 years ago

We need some way to share state and distribute work among a number of coordinating logstash workers. This should work with both a remote store and a local one. This is a meta issue to scope the work.

original-brownbear commented 6 years ago

From yesterdays presentation:

Introduction - Execution Flow Overview

See below Diagram for a design that involves P2P orchestration, using ES as the persistence layer.

diagram-3499148624186078969

3 Things Need Implementation

Task Partitioning

This is heavily dependant on plugin implementation wise. Logically we could identify three logical categories of plugins:

  1. Can’t be partitioned (TCP Input, UDP input, HTTP input I would exclude file input from this category since we could still document and implement this based on NFS trivially
  2. Doesn’t require partitioning (just supervision) in some configurations
    • Examples:
      • Kafka
      • RabbitMQ
      • HTTP Poller
    • Leader assigns partitions to workers after partitioning once (repartition only on node topology changes)
    • Workers report state back to leader (implicit heartbeat)
    • Leader stops execution, repartitions and restarts execution on node join/leave
  3. Requires partitioning and windowing (at least in some configurations)
    • Examples:
      • Kafka (manual partition assignment), HTTP Poller (set specific metadata)
      • S3
      • (NFS)
    • Leader continuously assigns tasks to workers
    • Workers leaving trigger reassignment of their task after a timeout
    • Workers joining are assigned new tasks as they (new tasks) become available

Task Serialization

Example Code Executed on Leader

S3

while(true) {
    workers  = getIdleWorkers(); // Blocks until worker becomes available 
    workerCount = workers.length;
    files = getOutstandingS3Files(workerCount) // block & ret. max. #workerCount files
    i = 0;
    context = getContext(); // holds short-lived state and is serializable
    for (file : files) {
        workers[i++].submit(
                ()  -> {downloadFileToLsQueue(context, file)); commitToEs(file);} // Runs on Worker
            ).onFailureOrTimeout(
                () ->{
                    workers[i % workerCount].markDead(); // Runs on Leader and persists information in ES
                    reEnqueue(file); // same here, leader persists the state to ES
           }
     );
    }
}

Kafka

while(true) {
    context = getContext(); // holds short-lived state and is serializable
    for (worker : getIdleWorkers()) { // Runs on Leader
        workers[i++].submit(
        () -> runKafkaInput(context).onFailureOrTimeout() // Runs on Worker
        () -> workers[i % workerCount].markDead()
     );
  }
}

Distributed State

Example Distributed Set for storing already processed files for S3

Extremal Single Node Case

Q & A

Why RAFT?

Why not Handle Orchestration via ES

original-brownbear commented 6 years ago

@andrewvc @colinsurprenant @jordansissel added more details above (and a diagram :P) in case anyone wants to take a look before tomorrow :)

yaauie commented 6 years ago

@original-brownbear 🎉 thanks for the helpful state diagrams -- RAFT appears to be a really useful protocol for this use-case.

I'd rather avoid introducing "master/slave" terminology to describe the roles if possible, largely because slavery is a horrific reality and describing this cluster in those terms doesn't add any more clarity than using other words; it's easy to let old terminology leak into new things, but if we could find descriptive names that make the underlying concepts easier to understand, I think we'll be better off. Maybe:

andrewvc commented 6 years ago

+1 on the state diagrams, great work there @original-brownbear. I agree that we should move away from the problematic terminology of master / slave. Well put @yaauie

jordansissel commented 6 years ago

Leader assigns partitions to followers

The pseudocode examples call workers what elsewhere is called followers. Let's pick one -- how about worker ?

Example Code Executed on Leader

S3 files = getOutstandingS3Files(workerCount) // block & ret. max. #workerCount files

I think this work should be executed on followers as a means to distribute the act of "finding work to be done".

If there is only 1 leader, this becomes a possible bottleneck. As an example with S3, on a large S3 bucket, when calling ListObjects, I can list approximately 1000 objects per second. For a large system with many pipelines and many inputs, it seems like distributing the "find work" (prospecting, in filebeat terminology) process would be beneficial.

The leader's implementation may be simplified by having it solely responsible for pairing work with followers:

while(true) {
    workers  = getIdleWorkers(); // Blocks until worker becomes available 
    workerCount = workers.length;
    jobs = getOutstandingJobs(workerCount) // any jobs like s3 files, etc.

In a way, moving prospecting to the followers/workers kind of neatly aligns with the "server" input models where you have a permanently-running task: beats input, tcp input, kafka, input, etc. In S3, the "server" job is the one that finds S3 files and publishes them to the leader as tasks to be executed.

S3 input, then, would be a permanently-running task (unless we define some terminating condition) that is always watching (prospecting) for new files to process. When new files are found, it submits them to the leader as a job needing to be done.

This model of a prospector on inputs can be used for S3 (find files), HTTP Poller (cron scheduler), JDBC input (cron scheduler), etc.

My main thought around having a follower assigned to prospecting is that this frees the leader from this responsibility and provides some isolation (and maybe security) that the leader won't need to run arbitrary code that could crash. It also would hopefully allow for distribution of prospecting-load.

All writes and reads to ES are executed the (RAFT-)leader if a healthy leader is available

Using ES for a state store makes a lot of sense. Is there any prior art where folks are using ES to replace, for example, ZooKeeper? Are you thinking the RAFT election is done by communicating through Elasticsearch? or is this peer-to-peer network communication?

Kafka

for (worker : getIdleWorkers()) { // Runs on Leader workers[i++].submit( () -> runKafkaInput(context).onFailureOrTimeout() // Runs on Worker

This code, the way I read it, would take all idle workers and make them permanently busy doing Kafka work.

How is "idle" determined?

jordansissel commented 6 years ago

@original-brownbear what you describe in this ticket is well aligned with what I had in mind when I presented this in Berlin - I think we are mostly on the same page here. :)

original-brownbear commented 6 years ago

@jordansissel

The pseudocode examples call workers what elsewhere is called followers. Let's pick one -- how about worker ?

Fixed the text to use worker :)

I think this work should be executed on followers as a means to distribute the act of "finding work to be done".

Not so sure about that in practice. In theory it sounds like it may save some time if you partition things in a smart way. But in the end you get some higher degree of parallelism on the index operation, but you also have to do more network roundtrips between workers to synchronize on who does what. Especially failures on workers assigned to indexing are super hard to detect unless they sync back to the Leader (in which case the leader needs to handle the global state of all the files anyhow).
Also I don't really see a good way to shuffle the work in S3 without a sort. You can't really hash partition by filename, that will give you bad behaviour if either file or worker count is low + it makes handling workers joining and leaving super hard. In the end it's really hard to imagine a single LS node not being able to build an index of all files in a bucket in a small amount of time when multi-threading right?

My main thought around having a follower assigned to prospecting is that this frees the leader from this responsibility and provides some isolation (and maybe security) that the leader won't need to run arbitrary code that could crash. It also would hopefully allow for distribution of prospecting-load.

The issue I see with distributing prospecting load is in the above point. As for running arbitrary code, I think we simply have to secure things on the network-layer level here. From the single node perspective there isn't really any new security issue in serializing and passing around closures since the code that is serialized still comes from the plugins. The only new attack vector is the network, so that needs some optional TLS+SSL wrapping imo. Spark does the same thing for serializing standard jobs and so far the approach seems to work out pretty well in practice. Implementing a general API for prospecting seems like an incredibly hard task and has the huge risk of the API eventually becoming outdated and not applicable to new plugins (kinda like MapReduce ... you can't really find a 100% abstraction of partitioning and mapping that fits all or even most use cases easily).

Using ES for a state store makes a lot of sense. Is there any prior art where folks are using ES to replace, for example, ZooKeeper? Are you thinking the RAFT election is done by communicating through Elasticsearch? or is this peer-to-peer network communication?

As shown in the diagram, all leader election, work assignment etc. should be done through a p2p layer. Using ES for this is just way too error prone and hard to implement without any benefit to it (in fact you get the downside of a lot more network roundtrips if everyone needs to communicate their messages to ES and everyone polling for new messages from ES, standing connections are much easier to do using RAFT). Also using standing connections allows for simply synchronizing access to ES by only allowing the leader to write to it. Then all data structures necessary to bootstrap a single node as well as the complete cluster can be backed by ES lookups for persistence, easy discovery by ES coordinates (and if necessary performance, though I'm not seeing a practical example fo that performance need yet) of the state beyond all nodes dying. Communication with ES being polling only simply makes leader election etc. way too hard without a P2P layer on top of it imo. Also detecting a node going away is really hard without a standing connection, you'd have to keep a log of heartbeats for that in ES and constantly poll and update it, not really a good use use case for ES imo.

This code, the way I read it, would take all idle workers and make them permanently busy doing Kafka work.

Jup.

How is "idle" determined?

Leader has access to the global state of available nodes and tasks/partitions to be worked on. If some node isn't assigned any task but is live => it's idle :)

original-brownbear commented 6 years ago

@jordansissel

That said ... if we have a strong closure serialization and writable shared state implementation, I don't see what would prevent us from trying to distributed prospecting tasks as well. It should be close to as easy as writing multi-threaded code once we have a clean framework for those two things up and running. I'm not so optimistic about this when it comes to S3, but experimentation should be cheap if we do a good job up front in core :)

original-brownbear commented 6 years ago

@andrewvc how about moving this to a PR with the design document in the above comment as a .md in it. I have a few points I'd like some written feedback on now and if we do it in a single threaded way here it's gonna be a huge pain to follow right?

andrewvc commented 6 years ago

I think that's a great idea

On Dec 4, 2017 10:13 AM, "Armin Braun" notifications@github.com wrote:

@andrewvc https://github.com/andrewvc how about moving this to a PR with the design document in the above comment as a .md in it. I have a few points I'd like some written feedback on now and if we do it in a single threaded way here it's gonna be a huge pain to follow right?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/elastic/logstash/issues/8682#issuecomment-349010948, or mute the thread https://github.com/notifications/unsubscribe-auth/AAIBYxqxT74eJeo2a0d0eQtGKp-Gegzoks5s9BopgaJpZM4Qfb1w .