elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
9 stars 990 forks source link

Implement 'partition by shard' algorithm and 'best effort routing' of bulk requests #1130

Open jbaiera opened 6 years ago

jbaiera commented 6 years ago

replaces #745

Bulk rejections are a constant pain point when running very large Hadoop/Spark jobs. Each outgoing bulk request is split on the Elasticsearch network thread and sent to each node's bulk queue as a transport bulk request. This means that for 300 Spark nodes sending parallel requests to 30 ES nodes/shards, then there could potentially at maximum still be 300 transport bulk actions after splitting, assuming the worst case scenario of each request splitting its docs evenly over each shard. Our advice in the past has been to lower the number of concurrent writers to Elasticsearch to keep the bulk queues from overflowing. Increasing the bulk queue size or infinitely retrying can lead to even worse congestion and longer job times.

One way to smooth this out would be to provide a "best-effort" routing mechanism, and the ability to partition requests by the primary shard they would be sent to. This could help limit the impact of a Hadoop/Spark job on bulk request queues overflowing. The prerequisites and steps to take would be as follows:

Approach:

  1. Add a best effort routing table to the job. At startup when we determine the locations of the primary shards, we could read from the cluster state endpoint to determine the best way to route the request at that point in time. When the total size of the queues reach the bulk flushing threshold, we launch a bulk request to each node (preferably async if possible). If the routing changes during the job, each node will simply forward the bulk request on to the correct node as usual.
  2. Sporadically update the routing table. For streaming technologies like Storm, where we launch the job and never update anything, we might be able to have each task register a periodic best effort routing table update request.
  3. Partitioner Implementation. Adding best effort routing is not enough to get bulk queue overflowing down. It simply removes the need for an ES node to forward the request in most cases. To actually get the bulk queue overflow under control we should introduce partitioning functionality in the execution framework that allows for routing the documents to writers based on the shards they are expected to land on. This, coupled with the best-effort routing, should handle most issues pertaining to out of date routing information (should the cluster state change during the job run).

Requirements:

  1. Requires effective IDs. For every data element that passes through the partitioner/writer there needs to be a document ID, a routing ID, or both present. Without these, the routing cannot be determined. We will not be extracting Elasticsearch's ID generation code. We just wont.
  2. Requires static indices for writes. If an index does not exist at the start of the job, then it is impossible to know which shard (which is yet to be created and allocated) that a data element should go to.
jbaiera commented 6 years ago

Regarding Requires static indices for writes :

A potential solution to this problem is to partition documents by their regular hashcode instead and write them out using an unknown queue that sends to the pinned node and relies on that node to route the data, and update the routing table periodically with new index data.

On the other hand, this feels like it violates a rule for most partitioning algorithms, in that the partitioning algorithm should produce stable results over the entire execution for the same data, and updating the routing table with newly created indices may route documents to Hadoop/Spark tasks differently over time. Additionally, should a job run into a point in time where there's a high amount of data that cannot be routed (i.e. for daily indices: the next day at midnight when events roll over the date), then the unknown writing queue would get most if not all the documents, causing the partition step to be wasted processing and network time, and the bulk queues to be subjected to the same flood of bulk requests as without this optimization.

nerdynick commented 6 years ago

Our Spark jobs are at a point where this feature would be very useful to have and we're willing to start working on making it happen. But want to get an idea of what changes are being thought of to make this so.

I've looked a bunch into the BulkCommand and AbstractBulkFactory as well as the FieldExtractor's. I feel the FieldExtractors may need to change to no export direct JSON but instead return Map. Otherwise we'll have to do a post extractor JSON walk of the meta, similar to the JsonFieldExtractors, to figure out _route and/or _id values along with _index and _type. What are the thoughts around this?

The BulkCommand.write method may have to change to return both the ByteRef and a new routing hash to the RestReposity as well to probably hand off to the BulkProcessor. I feel the BulkProcessor is probably the best place for the partitioning logic but the RestClient could be as well.

With the keeping the partition table updated. I think we can do two fold. Have a timer refresh the cluster information. Allowing long running stream, storm, to keep up to date with node failures and shard movement. Then the 2nd would be for dealing with dynamic table but tripping a call within the partitioner to request state on an index and then trigger a table create call if no state is known. This then would allow at the time the Bulk Requests for that table where to go out to preemptively know where those shards are. We could also just not do the 2nd for the first go and just rely on the refresher to grab this info on the next poll. Thoughts?

jbaiera commented 6 years ago

I've looked a bunch into the BulkCommand and AbstractBulkFactory as well as the FieldExtractor's. I feel the FieldExtractors may need to change to no export direct JSON but instead return Map. Otherwise we'll have to do a post extractor JSON walk of the meta, similar to the JsonFieldExtractors, to figure out _route and/or _id values along with _index and _type. What are the thoughts around this?

The plan was indeed to instantiate the field extractors in the same way that the bulk factories were doing so, in order to extract the index, and id/routing per document. For the partitioner portion, this becomes very straightforward, as we only have to create the integration specific field extractors to pull the requisite data to check against the routing table.

The BulkCommand.write method may have to change to return both the ByteRef and a new routing hash to the RestReposity as well to probably hand off to the BulkProcessor. I feel the BulkProcessor is probably the best place for the partitioning logic but the RestClient could be as well.

Building off my above comments, you are right: This becomes a bit more tricky with the actual bulk request building portion, as the current client is built assuming that only one node is a "pinned" node for request affinity, and only one byte buffer is allocated for bulk requests. There will need to be a large number of changes to the current client code in order to make it possible to "pin" to different nodes based on the requests being made, and to maintain different ongoing bulk bodies per node. Frankly, this will be the heaviest lift in the initiative, and most likely will need to be multiple PR's of work to get us there.

With the keeping the partition table updated. I think we can do two fold. Have a timer refresh the cluster information. Allowing long running stream, storm, to keep up to date with node failures and shard movement. Then the 2nd would be for dealing with dynamic table but tripping a call within the partitioner to request state on an index and then trigger a table create call if no state is known. This then would allow at the time the Bulk Requests for that table where to go out to preemptively know where those shards are. We could also just not do the 2nd for the first go and just rely on the refresher to grab this info on the next poll. Thoughts?

I have some notes somewhere that have a few options for handling these outliers. It seems that you've lined up pretty much the same solutions. I neglected to add them to the issue originally because I wanted to avoid scope creep on an already large initiative.

The first thought was to have an "unknown" queue that would collect any requests that did not have a routing yet. The problem with this is that if you are streaming data and the index name you are writing to is date based, the minute you cross that date boundary you step right back into that N*M problem.

This lead to the idea of having a protocol for refreshing the routing table on a routing miss, then making index creation call, letting Elasticsearch handle the index creation, waiting on the index to become yellow, and refreshing the routing data again to gain the correct route. This obviously has a lot of working parts, and can be a place that can collect errors: What happens if the index shards cannot be allocated for any reason? We could fall back to an unknown queue, but then we'd violate the invariants of a stable(ish) partitioning function, etc... I felt it was easier to just cross that bridge when we get to it, since it would be easy to contain it within the routing table implementation.

jbaiera commented 6 years ago

@nerdynick Also, another point here on this issue - I'm still doing some investigation on how to perform this sort of operation on all of our integrations. This includes making sure users of Hive and Pig can enjoy the same optimization that users of Spark and Storm will have.

To comment on the priority for this work: It is currently on my roadmap, but its a bit far out at the moment, as I have a few other items that I would like to get done first. If you know anyone who would be positively affected by this, please feel free to have them +1 the issue. It makes prioritizing things quite a bit easier.

nerdynick commented 6 years ago

The plan was indeed to instantiate the field extractors in the same way that the bulk factories were doing so, in order to extract the index, and id/routing per document. For the partitioner portion, this becomes very straightforward, as we only have to create the integration specific field extractors to pull the requisite data to check against the routing table.

So you're say we should just rewalk the bulk template line when doing routing instead of maintaining it in an easily accessed obj, Map?

I have some notes somewhere that have a few options for handling these outliers. It seems that you've lined up pretty much the same solutions. I neglected to add them to the issue originally because I wanted to avoid scope creep on an already large initiative.

I'd like to see what notes you have here. Because one of the other areas I've noticed would benefit from having a cluster state refresh going on, besides storm. Which is with Spark. If you have more tasks then Executors/Cores, kafka reading tends to do this. For each task ES-Hadoop will recreate the EsRDDWritter which intern has to refresh cluster state and rebuild all the extractor/byteref stuff. Where instead that information can remain in memory of the executor for the duration of the job/stream and just be periodically refreshed. This could heavily improve job times as only the 1 task on an executor would pay the cost instead of every task. Right now I have to do a local shuffle to get the task counts down to 1/executor/core.

The first thought was to have an "unknown" queue that would collect any requests that did not have a routing yet. The problem with this is that if you are streaming data and the index name you are writing to is date based, the minute you cross that date boundary you step right back into that N*M problem.

I really don't think there is a way around the Unknown queue. I think we can do work to mitigate the usage of it, like have a dedicated config for that queue to send data sooner then we would for others.

What happens if the index shards cannot be allocated for any reason? We could fall back to an unknown queue, but then we'd violate the invariants of a stable(ish) partitioning function, etc... I felt it was easier to just cross that bridge when we get to it, since it would be easy to contain it within the routing table implementation.

I think in this case we could still maintain partitions. Just when a partition goes to submit if its shard/node is unavailable we send to last known good node instead and let ES handle the routes. It'd keep the partitioning logic consistent and just effect the buffer overflow issue. Maybe if that partition is down we could also switch to the unknown logic above and send sooner then later to help mitigate that issue.

nerdynick commented 6 years ago

Also, another point here on this issue - I'm still doing some investigation on how to perform this sort of operation on all of our integrations. This includes making sure users of Hive and Pig can enjoy the same optimization that users of Spark and Storm will have.

I'm not sure how the others would be effected if we keep this logic isolated to the hadoop.rest.* code. If you can elaborate on where you see issues. That help me make sure we keep them up to date with the changes as well.

To comment on the priority for this work: It is currently on my roadmap, but its a bit far out at the moment, as I have a few other items that I would like to get done first. If you know anyone who would be positively affected by this, please feel free to have them +1 the issue. It makes prioritizing things quite a bit easier.

I'm not sure of anyone outside of us and our customers/clients needing this. However this is also why I'm offering to preform the work needed to get this out there. As we could really use it with our volume of data currently (100k+ records/sec) and going forward with our plans(10x).

jbaiera commented 6 years ago

I'm not sure how the others would be effected if we keep this logic isolated to the hadoop.rest.* code. If you can elaborate on where you see issues. That help me make sure we keep them up to date with the changes as well.

I mostly mean that for this optimization to work, we need to make sure that there is a guide for each integration that allows it to configure and run with the partitioner for loading data. I know that Hive requires you to write your query very differently in the case of custom partitioning. We just need to make sure all integrations are supported as best as possible.

jbaiera commented 6 years ago

Sorry, this is a lot of questions to go through, so I appologize if not all the answers are complete. This is a very high level description of the work needed, and not all the stuff that needs to be done is broken into discreet work items yet.

So you're say we should just rewalk the bulk template line when doing routing instead of maintaining it in an easily accessed obj, Map?

I'm saying that when we write the record, we pass it to a piece of code that uses the field extractors to pull out the id/routing/index names, then checks it against the routing table to determine the shard/nodes to send it to, and then after the routing is determined, THEN serialize the record into the JSON bulk entry. The bulk processing code would then be handed both the routing info and the raw JSON bulk entry to martial into the correct bulk bodies per shard, and when the total size of all bulk bodies tips over the flush limit, we would determine the node to send each shard request to and fire the requests off.

nerdynick commented 6 years ago

I'm saying that when we write the record, we pass it to a piece of code that uses the field extractors to pull out the id/routing/index names, then checks it against the routing table to determine the shard/nodes to send it to, and then after the routing is determined, THEN serialize the record into the JSON bulk entry.

So if I'm reading this right. We'd then need to separate the FieldExtraction logic from the FieldWriting layer. As right now they work as a stack where the bottom will get a value and a writer ends up wrapping it to add the JSON keys and wrappings before that information is returned to the BulkCommand. Otherwise we'd be doing field look ups twice and with JSON data could decrease performance.

The thought I had was to refactor the Extraction layer to manipulate a reusable EnumMap instead of direct writing the json to the ByteRef. We should be able to save memory and object create/deletion like we are with the ByteRef. Then within the BulkCommand we can preform the serialization of that information via the FieldWriters, or something else. I'm not sure how this would play out with other users custom implementations of IndexExtractors as that seems to be the predominate usage of writing out the full JSON header.

jbaiera commented 6 years ago

I think that's a fairly reasonable idea. At this phase, I'm not so much worried about the potential performance hit that reading the index/id/routing fields a second time might carry with it. Since there are so many moving parts that need to be working correctly here between the routing tables and the multiple open requests, I figure that is a kind of fix that can be enacted after the fact if there's any serious performance drawbacks attached to it. I don't have any strong feelings about the extraction code as it is right now.

jbaiera commented 3 years ago

A number of improvements have gone in to ES in the last year or so around managing the load on the write queue. For instance, the write queue now supports up to 10000 entries https://github.com/elastic/elasticsearch/pull/59559, which greatly eases the burden on the cluster to accept a surge of bulk requests from very many clients. This might still be a useful feature, but perhaps it can be done in a more pared down way.