elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
69.75k stars 24.68k forks source link

Parallel/concurrent reads #13494

Closed costin closed 8 years ago

costin commented 9 years ago

this is generalization of https://github.com/elastic/elasticsearch/issues/9483

Problem

For libraries that want to read data from Elasticsearch in a parallel/concurrent/multi-threaded way, one needs to split the target (aka data in ES) into independent chunks which then can be consumed individually.

Current approach

ES doesn't offer a proper API to achieve this; the current approach implies understanding the shards topology (which can be tricky and the incorrect granularity) to determine the chunks followed by a hacky attempt of locking down a read to a certain chunk which unfortunately is problematic when trying to read aliases or multiple indexes across the same shards (and what issue https://github.com/elastic/elasticsearch/issues/9483 tried to fix).

Desired behavior

The user ideally would simply request the number of splits/chunks for a given query and get back the number of possible partitions/readers ids which then the consumer can start using to get back the results. So for example, the user could ask for 10 chunks for a given index.

ES in turn would decide whether it can and how to best create these splits: likely the chunks requested will not match the index topology (such as shards) so a split might spread across multiple shards while another might read only a subset of a shard.

This would be similar to the scan/scroll approach where the initial query returns a scroll_id which in turn, returns the results.

Going a step further, the user could specify the granularity needed such as: NODES or SHARDS to bound the parallelism based on the actual topology of the target index/indices.

The user should not have to deal with the actual partition or follow-up connection; rather it would have the relevant information already computed by ES. The chunk should contain locality information - on what node/ip is the data retrieved from.

costin commented 9 years ago

@boaz @imotov please add your thoughts to the issue @clintongormley I'm going to close #9483 in favour of this issue; hopefully it won't send the issue in the back of the queue, planning wise :)

imotov commented 9 years ago

@costin - just to clarify. I don't really see much sense in creating more slices than we have shards in the index. So, if a client asks for 10 slices and index has only 5 shards, it wouldn't make much sense to return more than 5 slices. On the other side, if the client asks for 2 slices, but we have 5 shards running on 3 different nodes, there is really no good split either. So, maybe client shouldn't really ask for the number of chunks but instead have parallelism mode: nodes or shards which will cause elasticsearch to give one scroll_id per shard or one scroll_id per node. What do you think?

petchema commented 9 years ago

We had this issue with Elasticsearch-hadoop, the map/reduce client is a relatively (compared to our ES cluster) large Hadoop cluster, and since ES-hadoop can only start one mapper per shard we had to create indices with several hundred shards (more than we usually do) so it doesn't limit the mapping parallelism. That requires some planning, and also make those indices less efficient for other operations, like indexing or standard search queries.

apatrida commented 8 years ago

@imotov it does make sense to divide a shard into slices if you can read one slice from a primary, one slice from a replica, another from yet another replica, etc.

but definitely having a plan that allows parallel reading and dividing the load across shards, nodes, machines, the rack or racks. And then yes, need something that is a per node ID so you can read individually.

I think a lot of us have done this in the same way. Take an alias, convert to list of indexes, look at the shard plan, solve the plan for evenly dividing work across the highest number of nodes that have data, query from each node using shard/node hints, either locally (if our extraction is on the node or in a plugin) or remotely ensure more parallelism.

This is also something that might cause the desire for I/O, CPU or memory throttling for these queries because you start to have larger batch reads at the same time you don't want to crush the cluster. And similar circuit breakers (throttles are likely better).

imotov commented 8 years ago

@apatrida what I meant is there is no good practical way to "slice" a primary shard and a replica shard into the same slices. You can do things like scanning all records and selecting some records based on hash of id, but it's not very practical and it's something that you can do on the client side already.

clintongormley commented 8 years ago

Now that search requests are parsed on the coordinating node, we can add support for breaking a single scroll request down into multiple scrolls, which can be pulled in parallel. The way it could work is as follows:

This request starts a search against all indices starting with my_index and requests 10 scroll IDs in response:

GET my_index*/_search?scroll=1m&scroll_ids=10

Elasticsearch finds all matching shards and determines that there are 8 shards involved, which means we need two more scroll_ids than we have shards. This means that it needs to split two shards into two.

It sorts the shards by number of docs in each and chooses to split the two largest shards. The field stats API returns the min/max UID which we split into two ranges, and apply those ranges as a filter on those shards, eg:

{ "range: { "_uid": { "gte": "a000000", "lt": "m000000" }}}

The response returns an array of scroll_ids with the total hits, but without returning the first tranche of hits:

{
  "scroll_ids": [ {
    "scroll_id": "cXVlcnlUaGVuRmV0Y2g7NTs3NzpWMzZBQTJXb1JEMlFNcGZkZTFMQ3FnOzc2OlYzNkFBMldvUkQyUU1wZmRlMUxDcWc7Nzg6VjM2QUEyV29SRDJRTXBmZGUxTENxZzs3OTpWMzZBQTJXb1JEMlFNcGZkZTFMQ3FnOzgwOlYzNkFBMldvUkQyUU1wZmRlMUxDcWc7MDs=",
   "hits": {
    "total": 10000,
    "hits": []
  },
  ...
]}

We assume UIDs are well distributed but that may not be the case. There is no guarantee that each scroll ID will return a similar number of results.

NOTE: If we have more shards than we requested, we just return one scroll id per shard.

bleskes commented 8 years ago

imo, and @costin can confirm, but I believe that reading shards in parallel is good enough and we don't need the complexity of sub-shard split. I think we can start with figuring that part out (and if people ask for 10 readers and we only have 8 shards, we'll give them two empty readers).

costin commented 8 years ago

The sub-shard splitting is real and already a problem. This occurs for example in Spark where the reader tries to load the data in memory but it can only access a max of 2 GB (during shuffling, related to ByteBuffers and Integer.MAX_VALUE). The solution in Spark is to increase the number of workers so the memory is spread in chunks of 2GBs so many times a reader will have way more partitions than shards. Unless the shard is actually split, this problem still remains.

clintongormley commented 8 years ago

@costin and @imotov just had a chat about this and came up with a better idea:

Return one scroll ID per shard. Each scroll ID can be used in parallel processes, which means that requests to the same search context on each shard should be executed serially to avoid overlaps. Once no more hits are returned, scrolling is done.

This API could also return info about which node hosts each shard, which can be used by spark/hadoop to choose local workers.

s1monw commented 8 years ago

Return one scroll ID per shard. Each scroll ID can be used in parallel processes, which means that requests to the same search context on each shard should be executed serially to avoid overlaps. Once no more hits are returned, scrolling is done.

I personally think a solution to this problem should have some more properties than serialization. I think we need to make sure we don't depend on the state of a shard to ensure we can recover from failures without starting all over again. For instance if one worker needs to restart we should be able to only resume that one worker not all others. I also think that all state should be on the client except of the information we are already maintaining ie. the point-in-time snapshot (index reader). For this to work, the user should be in-charge of partitioning the document space. For instance a user with N consumers must specify the partition and the number of consumers when the scroll is initialized ie. user smust provide:

For every request we allow to provide the worker ID (0 - N) and we return only the slice of the data for the given worker ID. Even further we can allow to provide a document offset of some sort to resume (that would be nice). Implementation wise we can just fork a search context per worker and return all context IDs with the scroll ID once per worker.

that way we can keep everything with the same concurrency patterns and we can consume documents in parallel if the user wants to.

I hope this makes sense?

costin commented 8 years ago

Danke for the in-depth reply!

we need to make sure we don't depend on the state of a shard to ensure we can recover from failures without starting all over again

Agreed. This is already a problem (if a shard fails, one cannot resume the scroll/read) so to not make this feature heavier, during the discussion we ignored this aspect. Having this functionality built-in would be of course great since cluster changes would not break clients performing reading.

Even further we can allow to provide a document offset of some sort to resume (that would be nice).

That would be indeed nice. There are two scenarios here and it would be great if we could support both:

Wdyt?

clintongormley commented 8 years ago

Resuming a scroll request could only be done reliably if:

s1monw commented 8 years ago

the reading task gets restarted but without an offset (it got lost) can simply ask ES for the most recent offset in that scroll and continue. This might imply that the last request might have been lost by the consuming task but that's not ES concern.

I think for simplicity we can only allow this for the restart from where you are which means basically just reusing the scroll ID. For this to work we have to ensure that the search context is not closed until the worker is restarted which is 5 min by default.

jimczi commented 8 years ago

For every request we allow to provide the worker ID (0 - N) and we return only the slice of the data for the given worker ID.

There are some points we need to discuss before going further on this. First of all if we allow parallel reads on the same shard we'll need to ensure that the searcher is the same among the parallel readers. For instance in the hadoop case I assume that each worker/mapper is independent, they start a scroll in the beginning of each map and we have no guarantee that the searcher is the same but it's not a problem because each scroll is independent (they'll see different documents from different shards). With parallel readers we need to ensure that the searcher is the same, the only way to achieve this is to create the scroll context at the beginning of the job and we also need to ensure that those contexts won't go away during the timeline of the job. Assuming that the job could take hours I don't see how we could set the ttl on the context beforehand. Now what can we do if a replica is stopped during the job ? In the current implementation it's not a problem, we can start a new scroll on another replica. With parallel readers it's impossible, we still need to ensure that we read from the same replica/searcher which means that if a replica used in a parallel scroll is not reachable anymore the whole job should fail.

costin commented 8 years ago

Not sure whether it helps but in the Hadoop/Spark case the workers do not communicate with each other. At the start of the job, there is some light configuration (in ES-Hadoop that performs a search_shards and each task is instructed to connect against one) but that's about it. The configuration is very small and easily transferable. I mention this since context sounds heavyweight and not something that can be easily be serialized.

jimczi commented 8 years ago

New proposal based on Simon’s idea:

To slice the data we rely on _id or on a field provided by the user API wise it’s really simple and concise:

GET _search?scroll=1m
{
    "slice": {
        "field": "_id",
        "id": 0,
        "max": 2
    },
    "query": {
        "match_all": {}
    }
}

If the field is a string we use the hash of the string to perform the slicing, if it’s a number we use the value. OTB _uid is used from the fielddata. It’s safe we are sure that the content of the field is constant over time which is the guarantee to make the slicing consistent and it’s unique per index so each slice should have the same number of documents even if the number of slices is big. Though _id is not the most efficient way to do it especially when the number of slices is small. To address that we can simply advise in the documentation to use an auto-generated field populated with the hash of the id and docvalues enabled. It can be even more efficient to populate a field with a random number generated when the document is created (and that is never updated). This random number should be in the range [0, max_splits_per_query] to maximize the compression in the docvalues.

The main benefit I see with this approach is that each slice can be independent, they don’t need to use the same searcher/replica.

IMO in the Hadoop/Yarn/Spark case we should slice the data based on the desired number of documents per task/map and default to something like 10,000. With Spark and even with Hadoop it’s difficult to handle shards with millions of documents that produces long running tasks. So instead of having one task per shard we could have one task per slice_id and each slice_id would hit every shard but get only the slice they are responsible for. So the only states that a split has are the split_id, the number of splits and the query. This approach uses the entire cluster in a nice and fair way as, each slice_id can theoretically use a different replica of the same shard. The drawback is that we’ll need to handle jobs with thousands of map/task but I don’t think it’s a problem because most of the map/reduce systems allow to cap the number of maps/tasks that are allowed to run concurrently. In fact this number should be derived from the number of nodes available in the cluster.

As an high level API it can be used for parallel scrolls of the same query (parallel readers) but also as a way to resume a long running task. For instance the reindex could divide the input data in multiple slices and then perform each slice sequentially. Then to resume a reindex task we would just need to restart after the last successfully processed slice.

clintongormley commented 8 years ago

I love the proposal. Wondering if we should add support for from to sorted scroll requests?

jimczi commented 8 years ago

Thanks @clintongormley. I think it could be dangerous to use fromin a sorted scroll request. The sort might be consistent but if the sort field is updatable we might miss documents ? Moreover I think that replaying the whole scroll request with the slice id is enough. For the Hadoop case having the slice feature would mean that we can create splits based on the desired number of documents for each. This number should be set from the time it takes to process all the documents. Tasks that last 30s to 1 minute should be the goal and in that case replaying the failing task should not be a problem (vs replaying from where we fail in the task). What do you think ?

clintongormley commented 8 years ago

Sounds good to me

s1monw commented 8 years ago

I like it @jimferenczi lets start with this and get it going!!

jimczi commented 8 years ago

I did some testing of the slice feature. The following table shows the results for a full scroll of an index of 1M random documents on 1 shard (no replica) with different scroll configurations. The configuration contains the number of slices in the scroll, the number of documents per slice, the size of each request within the slice and the number of threads. The "elapsed time" is the total time for the scroll to complete in seconds. For instance the first line is for a scroll with 1 slice (current behavior), 1M documents in that slice, 10,000 documents per scroll request and 1 thread. The elapsed time is 23s. The slicing is done on the _uid (only when the number of slices is greater than 1) and the query cache is cleared after each run. I've kept the worst run out of 10 for each configuration:

Number of slice Number of documents per slice Fetch size Number of threads Elapsed time
1 1000000 10000 1 23s
1 1000000 1000 1 25s
1 1000000 100 1 40s
10 100000 10000 4 8s
10 100000 10000 3 10s
10 100000 10000 2 12s
10 100000 10000 1 25s
10 100000 1000 4 8s
10 100000 1000 3 10s
10 100000 1000 2 13s
10 100000 1000 1 27s
10 100000 100 4 11s
10 100000 100 3 15s
10 100000 100 2 19s
10 100000 100 1 38s
100 10000 10000 4 11s
100 10000 10000 3 14s
100 10000 10000 2 21s
100 10000 10000 1 43s
100 10000 1000 4 12s
100 10000 1000 3 15s
100 10000 1000 2 23s
100 10000 1000 1 46s
100 10000 100 4 13s
100 10000 100 3 17s
100 10000 100 2 26s
100 10000 100 1 54s
1000 1000 1000 4 44s
1000 1000 1000 3 57s
1000 1000 1000 2 85s
1000 1000 1000 1 175s
1000 1000 100 4 54s
1000 1000 100 3 71s
1000 1000 100 2 106s
1000 1000 100 1 212s
garyelephant commented 8 years ago

I'd love this feature for Spark SQL on Elasticsearch case.For now, the bottleneck I see in my environment is that scroll is too slow with one shard per task.This has serious concurrent problem. I'm looking forward for it.

karansinghkjs346 commented 6 years ago

@costin - I am using Spark Streaming Job which is dumping data into ES , When a parallel batch job is running and reading data from ES then Streaming job is getting into queue , Why ES is not able to handle the load ?

costin commented 6 years ago

@karansinghkjs346 For questions, please use the dedicated forum not the issue tracker.

Thanks,