GoogleCloudPlatform / appengine-mapreduce

A library for running MapReduce jobs on App Engine
https://github.com/GoogleCloudPlatform/appengine-mapreduce/wiki/1-MapReduce
Apache License 2.0
234 stars 109 forks source link

"The datastore operation timed out" with many shards #109

Open Erfa opened 7 years ago

Erfa commented 7 years ago

I tried starting a mapper job with the DatastoreInputReader on an entity type that has about 30 million entities. I specified 30,000 shards, which causes the /mapreduce/kickoffjob_callback job to fail due to a Datastore timeout.

It might be ridiculous to use a shard amount that high, but I thought I'd share it anyway. Here's the full stack trace:

The datastore operation timed out, or the data was temporarily unavailable.
Traceback (most recent call last):
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/base_handler.py", line 135, in post
    self.handle()
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/handlers.py", line 1387, in handle
    readers, serialized_readers_entity = self._get_input_readers(state)
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/handlers.py", line 1459, in _get_input_readers
    readers = input_reader_class.split_input(split_param)
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/input_readers.py", line 722, in split_input
    return super(DatastoreInputReader, cls).split_input(mapper_spec)
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/input_readers.py", line 355, in split_input
    query_spec.app, namespaces, shard_count, query_spec)
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/input_readers.py", line 395, in _to_key_ranges_by_shard
    app)
  File "/base/data/home/apps/e~appid/module:version/lib/mapreduce/input_readers.py", line 464, in _split_ns_by_scatter
    random_keys = ds_query.Get(shard_count * oversampling_factor)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1722, in Get
    return list(self.Run(limit=limit, offset=offset, **kwargs))
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 3321, in next
    next_batch = self.__batcher.next_batch(Batcher.AT_LEAST_OFFSET)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 3207, in next_batch
    batch = self.__next_batch.get_result()
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 613, in get_result
    return self.__get_result_hook(self)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2906, in __query_result_hook
    self._batch_shared.conn.check_rpc_success(rpc)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1371, in check_rpc_success
    raise _ToDatastoreError(err)
Timeout: The datastore operation timed out, or the data was temporarily unavailable.
speedplane commented 7 years ago

Unless you plan on running 30,000 jobs in parallel, you don't need to shard it that much. I start getting timeouts like the one you mention at around 1000 shards.