elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 986 forks source link

Automatic index creation fails with Elasticsearch >= 8.6.0 #2214

Closed mlyczek closed 2 months ago

mlyczek commented 2 months ago

What kind an issue is this?

Issue description

The automatic index creation feature in elasticsearch-spark has stopped working with Elasticsearch clusters starting from version 8.6.0.

When using default saving mode which saves documents directly to Elasticsearch shards, when Spark creates more than one task to save those documents into Elasticsearch, at least one task fails with the exception like in "Stack trace" section below.

I have spent (together with my colleagues) some time investigating this issue and we have found the following:

Steps to reproduce

  1. Try to save some documents into non-existing index, with both settings es.nodes.wan.only and es.nodes.client.only set to false (default)

Code:

public class Main {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("Example")
                .setMaster("local[*]")
                .set("es.nodes", "localhost:9200");

        try (JavaSparkContext sparkContext = new JavaSparkContext(sparkConf)) {
            List<Map<String, String>> data = Arrays.asList(
                    Map.of("propertyA", "valueA1", "propertyB", "valueB1"),
                    Map.of("propertyA", "valueA2", "propertyB", "valueB2"),
                    Map.of("propertyA", "valueA3", "propertyB", "valueB3")
            );
            JavaRDD<Map<String, String>> stringJavaRDD = sparkContext.parallelize(data);

            JavaEsSpark.saveToEs(stringJavaRDD, "test-index");
        }
    }
}

See https://github.com/mlyczek/elasticsearch-hadoop-concurrency-issue for full project ready to run.

Strack trace

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [test-index]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:689)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:634)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
    at org.elasticsearch.spark.rdd.EsSpark$.$anonfun$doSaveToEs$1(EsSpark.scala:108)
    at org.elasticsearch.spark.rdd.EsSpark$.$anonfun$doSaveToEs$1$adapted(EsSpark.scala:108)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Full log: full-log.txt

More investigation information

I have prepared a small Python script to show the inconsistent behaviour of Elasticsearch. It more or less mimics the behaviour of RestService.initSingleIndex() (checks if index exists, if not then creates it, waits for YELLOW status and then gets shards, if index does exist, then gets shards right away).

import logging
import threading
import time
import requests
from requests.auth import HTTPBasicAuth
from urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

es_url = "https://localhost:9201/"
index_name = "text-index"

my_auth = None

def run():
    while True:
        exists_reponse = requests.head(f"{es_url}/{index_name}", auth=my_auth, verify=False)
        if (exists_reponse.status_code != requests.codes.ok):
            logging.info("Creating test-index")
            response = requests.put(f"{es_url}/{index_name}", auth=my_auth, verify=False)
            logging.info("index create response %s", response)

            logging.info("Waiting for YELLOW")
            health_response = requests.get(f"{es_url}/_cluster/health/{index_name}?wait_for_status=YELLOW&timeout=10s", auth=my_auth, verify=False)
            logging.info("health response %s", health_response)
        else:
            logging.info("index exists")

        logging.info("getting shards")
        search_shards_response = requests.get(f"{es_url}/{index_name}/_search_shards", auth=my_auth, verify=False)
        shards = search_shards_response.json()['shards']
        logging.info(shards)
        if (shards[0] == []):
            time.sleep(0.01)
        else:
            break

if __name__ == "__main__":
    format = "%(asctime)s.%(msecs)03d [%(threadName)s]: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Deleting test-index")
    response = requests.delete(f"{es_url}/{index_name}", auth=my_auth, verify=False)
    logging.info(response)

    x = threading.Thread(name="X ", target=run)
    y = threading.Thread(name=" X", target=run)
    x.start()
    time.sleep(0.025)
    y.start()

Below is one of the outputs that I got after running the above script (it is necessary to adjust the sleep time at the end depending on the machine to catch the short period of time when the issue exists):

15:38:18.130 [MainThread]: Deleting test-index
15:38:18.159 [MainThread]: <Response [200]>
15:38:18.170 [X ]: Creating test-index
15:38:18.197 [ X]: index exists
15:38:18.198 [ X]: getting shards
15:38:18.210 [ X]: [[]]
15:38:18.233 [ X]: index exists
15:38:18.233 [ X]: getting shards
15:38:18.241 [X ]: index create response <Response [200]>
15:38:18.241 [X ]: Waiting for YELLOW
15:38:18.245 [ X]: [[{'state': 'STARTED', 'primary': True, 'node': 'dUGB49_fRcGcVonQgXvB_g', 'relocating_node': None, 'shard': 0, 'index': 'text-index', 'allocation_id': {'id': 'Q6Ya3noUTQuN5BLXWvXhcQ'}, 'relocation_failure_info': {'failed_attempts': 0}}]]
15:38:18.254 [X ]: health response <Response [200]>
15:38:18.254 [X ]: getting shards
15:38:18.266 [X ]: [[{'state': 'STARTED', 'primary': True, 'node': 'dUGB49_fRcGcVonQgXvB_g', 'relocating_node': None, 'shard': 0, 'index': 'text-index', 'allocation_id': {'id': 'Q6Ya3noUTQuN5BLXWvXhcQ'}, 'relocation_failure_info': {'failed_attempts': 0}}]]

In the above log, it can be seen that while one thread is waiting for Elasticsearch to finish creating the index, the other thread gets information that index exists, but the list of shards is empty, and after sleeping 0.01 second the list of shards is returned correctly.

Version Info

OS: Linux JVM: OpenJDK 17.0.9 Spark: 3.3.2 ES-Spark: 8.13.1 ES: >= 8.6.0

masseyke commented 2 months ago

Thanks for all of the work in narrowing this down! Do you know if the problem happens with es-hadoop versions older than 8.6 with elasticsearch versions 8.6 or newer? That is, given that es-hadoop has not changed much lately, I'm assuming that the bug is actually in elasticsearch (rather than es-hadoop). But it would be good to have confirmation of that.

masseyke commented 2 months ago

Do you know if the problem happens with es-hadoop versions older than 8.6 with elasticsearch versions 8.6 or newer?

Actually you already answered that question by providing a python script that hits elasticsearch directly, so disregard my question!

mlyczek commented 2 months ago

Do you know if the problem happens with es-hadoop versions older than 8.6 with elasticsearch versions 8.6 or newer?

I've just run the check using es-hadoop 8.0.0 against Elasticsearch 8.6.0 and the issue also occurs. We noticed it while upgrading our Elasticsearch clusters to 8.12.1.

As for where the bug is, to be honest, I considered creating an issue also in Elasticsearch's repository, but I thought it would be better to report it here first, so that you are also aware of this change in Elasticsearch behaviour, and decide where the best place to fix it is (either in Elasticsearch or in es-hadoop).

I'm happy to provide any further detail that might help you, or to report this issue also to main Elasticsearch repository.

masseyke commented 2 months ago

It looks like the bug is a race condition in es-hadoop that has been there for years, and was probably made more likely when the desired-balance allocator (or some change related to it) was introduced in 8.6.0. Here's what es-hadoop does when indexing a new document:

  1. Checks if the index exists with a HEAD request
  2. If not, does a PUT to create it, and then calls _cluster/health with wait_for_status=YELLOW
  3. Makes a query to get the shards for the index

The problem is that if two spark tasks are inserting documents into a nonexistent index at the same time, the first task does all the steps above. The second task comes in and sees that the index exists, so it skips the the second step (critically the wait_for_status=YELLOW check). So it immediately queries for the shards, which might not exist yet because the index is still being created. It looks like we're only using the shards to get a node to write to. And there's no guarantee that the shards returned by _search_shards will even be there when we actually go to write. So the best change here might be to just pick a node at random if the list of shards is empty.

Jordan-Szwejda commented 2 months ago

If I can recommend a solution I would vote for:

masseyke commented 2 months ago

if set of shard is empty, wait for YELLOW get shards again - now it should pass

There's no guarantee that shards won't move immediately after we fetch them.

throw an exception if seconds attempt for getting shards fails

But there's also no real harm in letting elasticsearch deal with this. If there is some real problem allocating shards at all, then elasticsearch is going to let us know anyway.

For those reasons, I'm just returning a random node if he shard check fails. This will probably almost always only be hit only when two splits are trying to create an index at the same time.