huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
19.29k stars 2.7k forks source link

Downloading via Apache Pipeline, client cancelled (org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException) #4524

Open dan-the-meme-man opened 2 years ago

dan-the-meme-man commented 2 years ago

Describe the bug

When downloading some wikipedia languages (in particular, I'm having a hard time with Spanish, Cebuano, and Russian) via FlinkRunner, I encounter the exception in the title. I have been playing with package versions a lot, because unfortunately, the different dependencies required by these packages seem to be incompatible in terms of versions (dill and requests, for instance). It should be noted that the following code runs for several hours without issue, executing the load_dataset() function, before the exception occurs.

Steps to reproduce the bug

# bash commands
!pip install datasets
!pip install apache-beam[interactive]
!pip install mwparserfromhell
!pip install dill==0.3.5.1
!pip install requests==2.23.0

# imports
import os
from datasets import load_dataset
import apache_beam as beam
import mwparserfromhell
from google.colab import drive
import dill
import requests

# mount drive
drive_dir = os.path.join(os.getcwd(), 'drive')
drive.mount(drive_dir)

# confirming the versions of these two packages are the ones that are suggested by the outputs from the bash commands
print(dill.__version__)
print(requests.__version__)

lang = 'es' # or 'ru' or 'ceb' - these are the ones causing the issue
lang_dir = os.path.join(drive_dir, 'path/to/my/folder', lang)

if not os.path.exists(lang_dir):
  x = None
  x = load_dataset('wikipedia', '20220301.' + lang, beam_runner='Flink',
                   split='train')
  x.save_to_disk(lang_dir)

Expected results

Although some warnings are generally produced by this code (run in Colab Notebook), most languages I've tried have been successfully downloaded. It should simply go through without issue, but for these languages, I am continually encountering this error.

Actual results

Traceback below:

Exception in thread run_worker_3-1:
Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 234, in run
    for work_request in self._control_stub.Control(get_responses()):
  File "/usr/local/lib/python3.7/dist-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/dist-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "Socket closed"
    debug_error_string = "{"created":"@1655593643.871830638","description":"Error received from peer ipv4:127.0.0.1:44441","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Socket closed","grpc_status":14}"
>

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 426, in __getitem__
    self._cache[target_window] = self._side_input_data.view_fn(raw_view)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 391, in <lambda>
    lambda iterable: from_runtime_iterable(iterable, view_options))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 512, in _from_runtime_iterable
    head = list(itertools.islice(it, 2))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1228, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1019, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1060, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '26'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 340, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    element.data)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 426, in __getitem__
    self._cache[target_window] = self._side_input_data.view_fn(raw_view)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 391, in <lambda>
    lambda iterable: from_runtime_iterable(iterable, view_options))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 512, in _from_runtime_iterable
    head = list(itertools.islice(it, 2))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1228, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1019, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1060, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '26' [while running 'train/Save to parquet/Write/WriteImpl/WriteBundles']

ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 26. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 426, in __getitem__
    self._cache[target_window] = self._side_input_data.view_fn(raw_view)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 391, in <lambda>
    lambda iterable: from_runtime_iterable(iterable, view_options))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 512, in _from_runtime_iterable
    head = list(itertools.islice(it, 2))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1228, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1019, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1060, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '26'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 340, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    element.data)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 426, in __getitem__
    self._cache[target_window] = self._side_input_data.view_fn(raw_view)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 391, in <lambda>
    lambda iterable: from_runtime_iterable(iterable, view_options))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pvalue.py", line 512, in _from_runtime_iterable
    head = list(itertools.islice(it, 2))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1228, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1019, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1060, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '26' [while running 'train/Save to parquet/Write/WriteImpl/WriteBundles']

ERROR:root:org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in the data plane.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/data_plane.py", line 634, in _read_inputs
    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/dist-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/dist-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.CANCELLED
    details = "Multiplexer hanging up"
    debug_error_string = "{"created":"@1655593654.436885887","description":"Error received from peer ipv4:127.0.0.1:43263","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/data_plane.py", line 651, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/worker/data_plane.py", line 634, in _read_inputs
    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/dist-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/dist-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.CANCELLED
    details = "Multiplexer hanging up"
    debug_error_string = "{"created":"@1655593654.436885887","description":"Error received from peer ipv4:127.0.0.1:43263","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
>

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
[/tmp/ipykernel_219/3869142325.py](https://localhost:8080/#) in <module>
     18   x = None
     19   x = load_dataset('wikipedia', '20220301.' + lang, beam_runner='Flink',
---> 20                    split='train')
     21   x.save_to_disk(lang_dir)

3 frames
[/usr/local/lib/python3.7/dist-packages/apache_beam/runners/portability/portable_runner.py](https://localhost:8080/#) in wait_until_finish(self, duration)
    604 
    605     if self._runtime_exception:
--> 606       raise self._runtime_exception
    607 
    608     return self._state

RuntimeError: Pipeline BeamApp-root-0618220708-b3b59a0e_d8efcf67-9119-4f76-b013-70de7b29b54d failed in state FAILED: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled

Environment info

albertvillanova commented 2 years ago

Hi @dan-the-meme-man, thanks for reporting.

We are investigating a similar issue but with Beam+Dataflow (instead of Beam+Flink):

In order to go deeper into the root cause, we need as much information as possible: logs from the main process + logs from the workers are very informative.

In the case of the issue with Beam+Dataflow, the logs from the workers report an out of memory issue.

dan-the-meme-man commented 2 years ago

As I continued working on this today, I came to suspect that it is in fact an out of memory issue - I have a few more notebooks that I've left running, and if they produce the same error, I will try to get the logs. In the meantime, if there's any chance that there is a repo out there with those three languages already as .arrow files, or if you know about how much memory would be needed to actually download those sets, please let me know!