ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.16k stars 5.8k forks source link

[streaming]Can not run streaming example and test #12842

Closed DoraTiger closed 1 year ago

DoraTiger commented 3 years ago

What is the problem?

Ray version and other system information (Python version, TensorFlow version, OS): Ray: ray-1.1.0.dev0 (master branch 35f7d84dbe0c4c2bfe8117727eeda5256fe2c3a2) Python: 3.8.5 with conda Java: 11.0.8 with Alibaba Dragonwell OS: Ubuntu 20.04 with WSL2

Reproduction (REQUIRED)

I try to use ray streaming API,and write a example follow the test_word_count.py.

import ray
from ray.streaming import StreamingContext
from ray.streaming.config import Config
import time
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    # Get program parameters
    ray.shutdown()
    ray.init(_load_code_from_local=True)

    ctx = StreamingContext.Builder() \
        .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
        .build()
    # A Ray streaming environment with the default configuration
    ctx.set_parallelism(1)  # Each operator will be executed by two actors
    ctx.from_values("a", "b", "c") \
        .set_parallelism(1) \
        .flat_map(lambda x: [x, x]) \
        .map(lambda x: (x, 1)) \
        .key_by(lambda x: x[0]) \
        .reduce(lambda old_value, new_value:
                (old_value[0], old_value[1] + new_value[1])) \
        .sink(lambda x: print("result", x))
    ctx.submit("word_count")

and I get the bellow error(replace the home path with ***):

2020-12-14 20:19:34,087 INFO services.py:1208 -- View the Ray dashboard at http://127.0.0.1:8265
(pid=raylet) [2020-12-14 20:19:35,480 C 24239 24239] worker_pool.cc:917:  Check failed: state != states_by_lang_.end() Required Language isn't supported.
(pid=raylet) [2020-12-14 20:19:35,480 E 24239 24239] logging.cc:414: *** Aborted at 1607948375 (unix time) try "date -d @1607948375" if you are using GNU date ***
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414: PC: @                0x0 (unknown)
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414: *** SIGABRT (@0x3e800005eaf) received by PID 24239 (TID 0x7f42d449a800) from PID 24239; stack trace: ***
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x562c88db7537 google::(anonymous namespace)::FailureSignalHandler()
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x7f42d46cc3c0 (unknown)
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x7f42d44e518b gsignal
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x7f42d44c4859 abort
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x562c8887ca53 _ZN3ray6RayLogD2Ev.cold
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x562c88902e9f ray::raylet::WorkerPool::GetStateForLanguage()
(pid=raylet) [2020-12-14 20:19:35,482 E 24239 24239] logging.cc:414:     @     0x562c88909b77 ray::raylet::WorkerPool::PopWorker()
(pid=raylet) [2020-12-14 20:19:35,482 E 24239 24239] logging.cc:414:     @     0x562c8896179d ray::raylet::NodeManager::DispatchTasks()
(pid=raylet) [2020-12-14 20:19:35,483 E 24239 24239] logging.cc:414:     @     0x562c88963053 ray::raylet::NodeManager::EnqueuePlaceableTask()
(pid=raylet) [2020-12-14 20:19:35,483 E 24239 24239] logging.cc:414:     @     0x562c8896c1c7 ray::raylet::NodeManager::ScheduleTasks()
(pid=raylet) [2020-12-14 20:19:35,484 E 24239 24239] logging.cc:414:     @     0x562c88977fce ray::raylet::NodeManager::SubmitTask()
(pid=raylet) [2020-12-14 20:19:35,484 E 24239 24239] logging.cc:414:     @     0x562c889793b7 ray::raylet::NodeManager::HandleRequestWorkerLease()
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c888d2a76 _ZN5boost4asio6detail18completion_handlerIZN3ray3rpc14ServerCallImplINS4_25NodeManagerServiceHandlerENS4_25RequestWorkerLeaseRequestENS4_23RequestWorkerLeaseReplyEE13HandleRequestEvEUlvE_E11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c890efb41 boost::asio::detail::scheduler::do_run_one()
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c890f1281 boost::asio::detail::scheduler::run()
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c890f38fb boost::asio::io_context::run()
(pid=raylet) [2020-12-14 20:19:35,486 E 24239 24239] logging.cc:414:     @     0x562c88898482 main
(pid=raylet) [2020-12-14 20:19:35,486 E 24239 24239] logging.cc:414:     @     0x7f42d44c60b3 __libc_start_main
(pid=raylet) [2020-12-14 20:19:35,486 E 24239 24239] logging.cc:414:     @     0x562c888adf1e _start
Error in sys.excepthook:
Traceback (most recent call last):
  File "***/ray/python/ray/worker.py", line 875, in custom_excepthook
    ray.state.state.add_worker(worker_id, worker_type, worker_info)
  File "***/ray/python/ray/state.py", line 733, in add_worker
    return self.global_state_accessor.add_worker_info(
AttributeError: 'NoneType' object has no attribute 'add_worker_info'

Original exception was:
Traceback (most recent call last):
  File "./test2.py", line 16, in <module>
2020-12-14 20:20:05,461 WARNING worker.py:1050 -- The node with node id 447728161d99bd4896a1a2c719adcc3930806be4 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.
    ctx = StreamingContext.Builder() \
  File "***/ray/python/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "***/ray/python/ray/streaming/context.py", line 54, in __init__
    self._j_ctx = self._gateway_client.create_streaming_context()
  File "***/ray/python/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
    return deserialize(ray.get(call))
  File "***/ray/python/ray/worker.py", line 1401, in get
    raise value
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.

At the begin,I use the ray-1.0.0rc2 and Building Ray from Source.The I get the error and search method to solve it,then I saw the issue StreamingContext Builder error from Ray Streaming Example #11602.So I try to use ray-1.0.1.post1 and the master branch . There are the same error.

And here is another question,I can not success run the steaming test according to the readme.rst.Can not pass the java test HybridStreamTest.java,The problem occurs in the 48-49 line Preconditions.checkArgument(EnvUtil.executeCommand(ImmutableList.of("ray", "stop"), 5)); And can not success run streaming's pytest at master branch:

===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.8.5, pytest-6.2.0, py-1.10.0, pluggy-0.13.1
rootdir: ***/ray/streaming/python/tests
collected 11 items / 2 errors / 9 selected                                                                                                                                                    

=========================================================================================== ERRORS ============================================================================================
__________________________________________________________________________ ERROR collecting test_direct_transfer.py ___________________________________________________________________________
ImportError while importing test module '***/ray/streaming/python/tests/test_direct_transfer.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/home/tiger/anaconda3/envs/py38-ray-master/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
test_direct_transfer.py:6: in <module>
    import ray.streaming._streaming as _streaming
E   ImportError: ***/ray/python/ray/streaming/_streaming.so: undefined symbol: aligned_free
______________________________________________________________________________ ERROR collecting test_operator.py ______________________________________________________________________________
ImportError while importing test module '***/ray/streaming/python/tests/test_operator.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/home/tiger/anaconda3/envs/py38-ray-master/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
test_operator.py:2: in <module>
    from ray.streaming import operator
../../../python/ray/streaming/operator.py:8: in <module>
    from ray.streaming.collector import Collector
../../../python/ray/streaming/collector.py:11: in <module>
    from ray.streaming.runtime.transfer import ChannelID, DataWriter
../../../python/ray/streaming/runtime/transfer.py:9: in <module>
    import ray.streaming._streaming as _streaming
E   ImportError: ***/ray/python/ray/streaming/_streaming.so: undefined symbol: aligned_free
=================================================================================== short test summary info ===================================================================================
ERROR test_direct_transfer.py
ERROR test_operator.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 2 errors during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
====================================================================================== 2 errors in 0.27s ======================================================================================

the error at ray-1.0.1.post1 and ray-1.0.0rc2 is same:

=================================================================================== short test summary info ===================================================================================
FAILED test_failover.py::test_word_count - ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
FAILED test_hybrid_stream.py::test_hybrid_stream - ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
FAILED test_stream.py::test_data_stream - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ra...
FAILED test_stream.py::test_key_data_stream - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling...
FAILED test_stream.py::test_stream_config - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling '...
FAILED test_union_stream.py::test_union_stream - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by call...
FAILED test_word_count.py::test_word_count - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling ...
FAILED test_word_count.py::test_simple_word_count - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by c...
lixin-wei commented 3 years ago

I'll take a look.

chaokunyang commented 3 years ago

@lixin-wei aligned_free seemed has be fixed in a previous PR.

ashione commented 3 years ago

@Devil-Z this issue has been fixed in master. Could you try in the lastest version? @edoakes I'm not sure can we pick this fix commit to release version.

edoakes commented 3 years ago

@ashione could you point me to the PR that fixes it please? It's unlikely that we can cherry-pick it at this point.

DoraTiger commented 3 years ago

@Devil-Z this issue has been fixed in master. Could you try in the lastest version? @edoakes I'm not sure can we pick this fix commit to release version.

Rebuild the ray with master branch 5d987f5 It seems that the master branch has changed some init config,ray.init() do not recieve config _load_code_from_local=True. When I run the code with ray.init(_load_code_from_local=True),I got these msg:

Traceback (most recent call last):
  File "test2.py", line 14, in <module>
    ray.init(_load_code_from_local=True)
TypeError: init() got an unexpected keyword argument '_load_code_from_local'

When I use ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)),I got these msg:

Traceback (most recent call last):
  File "test2.py", line 20, in <module>
    ctx = StreamingContext.Builder() \
  File "***/ray/python/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "***/ray/python/ray/streaming/context.py", line 53, in __init__
    self.__gateway_client = GatewayClient()
  File "***/ray/python/ray/streaming/runtime/gateway_client.py", line 16, in __init__
    self._python_gateway_actor = ray.java_actor_class(
  File "***/ray/python/ray/actor.py", line 408, in remote
    return self._remote(args=args, kwargs=kwargs)
  File "***/ray/python/ray/actor.py", line 655, in _remote
    creation_args = cross_language.format_args(worker, args, kwargs)
  File "***/ray/python/ray/cross_language.py", line 26, in format_args
    raise ValueError("Cross language feature needs "
ValueError: Cross language feature needs --load-code-from-local to be set.

I check the code fromdefault_worker.py,It seems that the config load_code_from_local should be true with the code_search_path is not null. I am not sure what mistake I have made. I only change the arg at line ray.init(). The offical example at ray/streaming/python/examples/wordcount.py have the same error.

pdames commented 3 years ago

I'm also seeing the same issues when trying to leverage cross-language calls from Python to Java as documented at: https://docs.ray.io/en/master/cross-language.html. If I make a simple local code change to force the "ValueError: Cross language feature needs --load-code-from-local to be set" check to pass after setting the code search path via job config in ray.init(...), then I run into the "worker_pool.cc:917: Check failed: state != states_bylang.end() Required Language isn't supported." error initially reported.

chaokunyang commented 3 years ago

@pdames Check failed: state != states_by_lang_.end() Required Language isn't supported." may be caused by no java installation in the machine. Could you check java installation in your machine? We should add more detailed error message.

ashione commented 3 years ago

@ashione could you point me to the PR that fixes it please? It's unlikely that we can cherry-pick it at this point.

Sure, I mean we might cherry-pick it to next minor version of 1.0.x. That's commit 462c7fb5754e75c96bb970955cd2ef9bb66786d6 and PR id 12345.

chaokunyang commented 3 years ago

@Devil-Z this issue has been fixed in master. Could you try in the lastest version? @edoakes I'm not sure can we pick this fix commit to release version.

Rebuild the ray with master branch 5d987f5 It seems that the master branch has changed some init config,ray.init() do not recieve config _load_code_from_local=True. When I run the code with ray.init(_load_code_from_local=True),I got these msg:

Traceback (most recent call last):
  File "test2.py", line 14, in <module>
    ray.init(_load_code_from_local=True)
TypeError: init() got an unexpected keyword argument '_load_code_from_local'

When I use ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)),I got these msg:

Traceback (most recent call last):
  File "test2.py", line 20, in <module>
    ctx = StreamingContext.Builder() \
  File "***/ray/python/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "***/ray/python/ray/streaming/context.py", line 53, in __init__
    self.__gateway_client = GatewayClient()
  File "***/ray/python/ray/streaming/runtime/gateway_client.py", line 16, in __init__
    self._python_gateway_actor = ray.java_actor_class(
  File "***/ray/python/ray/actor.py", line 408, in remote
    return self._remote(args=args, kwargs=kwargs)
  File "***/ray/python/ray/actor.py", line 655, in _remote
    creation_args = cross_language.format_args(worker, args, kwargs)
  File "***/ray/python/ray/cross_language.py", line 26, in format_args
    raise ValueError("Cross language feature needs "
ValueError: Cross language feature needs --load-code-from-local to be set.

I check the code fromdefault_worker.py,It seems that the config load_code_from_local should be true with the code_search_path is not null. I am not sure what mistake I have made. I only change the arg at line ray.init(). The offical example at ray/streaming/python/examples/wordcount.py have the same error.

This will be fixed in #12830

DoraTiger commented 3 years ago

After rebuild the ray with master branch 9643e, the issue code_search_path has been fixed, but still can not run the code that I mentioned at beginning, and the error is same.

```2021-01-05 11:57:38,601 INFO services.py:1169 -- View the Ray dashboard at http://127.0.0.1:8265
(pid=raylet) [2021-01-05 11:57:40,119 C 1660 1660] worker_pool.cc:894:  Check failed: state != states_by_lang_.end() Required Language isn't supported.
(pid=raylet) [2021-01-05 11:57:40,119 E 1660 1660] logging.cc:414: *** Aborted at 1609819060 (unix time) try "date -d @1609819060" if you are using GNU date ***
(pid=raylet) [2021-01-05 11:57:40,119 E 1660 1660] logging.cc:414: PC: @                0x0 (unknown)
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414: *** SIGABRT (@0x3e80000067c) received by PID 1660 (TID 0x7f8a4fbe9800) from PID 1660; stack trace: ***
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x5618259e3c57 google::(anonymous namespace)::FailureSignalHandler()
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x7f8a4fe1b3c0 (unknown)
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x7f8a4fc3418b gsignal
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x7f8a4fc13859 abort
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x5618255686f9 _ZN3ray6RayLogD2Ev.cold
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x5618255eaecc ray::raylet::WorkerPool::GetStateForLanguage()
(pid=raylet) [2021-01-05 11:57:40,121 E 1660 1660] logging.cc:414:     @     0x5618255f05c2 ray::raylet::WorkerPool::PrestartWorkers()
(pid=raylet) [2021-01-05 11:57:40,121 E 1660 1660] logging.cc:414:     @     0x56182564c3bc ray::raylet::NodeManager::HandleRequestWorkerLease()
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x5618255bc116 _ZN5boost4asio6detail18completion_handlerIZN3ray3rpc14ServerCallImplINS4_25NodeManagerServiceHandlerENS4_25RequestWorkerLeaseRequestENS4_23RequestWorkerLeaseReplyEE13HandleRequestEvEUlvE_E11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x561825d5bd11 boost::asio::detail::scheduler::do_run_one()
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x561825d5d451 boost::asio::detail::scheduler::run()
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x561825d5facb boost::asio::io_context::run()
(pid=raylet) [2021-01-05 11:57:40,123 E 1660 1660] logging.cc:414:     @     0x561825582918 main
(pid=raylet) [2021-01-05 11:57:40,123 E 1660 1660] logging.cc:414:     @     0x7f8a4fc150b3 __libc_start_main
(pid=raylet) [2021-01-05 11:57:40,123 E 1660 1660] logging.cc:414:     @     0x56182559682e _start
Traceback (most recent call last):
  File "test2.py", line 21, in <module>
    ctx = StreamingContext.Builder() \
  File "***/ray/python/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "***/ray/python/ray/streaming/context.py", line 54, in __init__
    self._j_ctx = self._gateway_client.create_streaming_context()
  File "***/ray/python/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
    return deserialize(ray.get(call))
  File "***/ray/python/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "***/ray/python/ray/worker.py", line 1395, in get
    raise value
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task. Check python-core-worker-*.log files for more information.
2021-01-05 11:58:10,073 WARNING worker.py:1044 -- The node with node id b1991ec40afb2a9d984ddc2268153b9e327efbedade137019c81422a has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.

The python-core-worker-*.log only repeat one error metric_exporter.cc:206: Export metrics to agent failed: IOError: 14: failed to connect to all addresses. This won't affect Ray, but you can lose metrics from the cluster. I try to add all streaming jar to the sys.path, but can not change anything.

pdames commented 3 years ago

@pdames Check failed: state != states_by_lang_.end() Required Language isn't supported." may be caused by no java installation in the machine. Could you check java installation in your machine? We should add more detailed error message.

Java 1.8 is installed on my EC2 node. Output from running java -version:

openjdk version "1.8.0_275"
OpenJDK Runtime Environment (build 1.8.0_275-8u275-b01-0ubuntu1~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)

I'm also able to successfully run and interact with a local Java py4j gateway server from Ray python scripts in place of making the cross-language calls given at https://docs.ray.io/en/master/cross-language.html.

However, the main value-add I'm looking to get out of Ray's Python/Java cross-language support is the ability to remove this py4j dependency, and ideally get lower-latency cross-language method invocations in return.

ashione commented 3 years ago

After rebuild the ray with master branch 9643e, the issue code_search_path has been fixed, but still can not run the code that I mentioned at beginning, and the error is same.

```2021-01-05 11:57:38,601 INFO services.py:1169 -- View the Ray dashboard at http://127.0.0.1:8265
(pid=raylet) [2021-01-05 11:57:40,119 C 1660 1660] worker_pool.cc:894:  Check failed: state != states_by_lang_.end() Required Language isn't supported.
(pid=raylet) [2021-01-05 11:57:40,119 E 1660 1660] logging.cc:414: *** Aborted at 1609819060 (unix time) try "date -d @1609819060" if you are using GNU date ***
(pid=raylet) [2021-01-05 11:57:40,119 E 1660 1660] logging.cc:414: PC: @                0x0 (unknown)
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414: *** SIGABRT (@0x3e80000067c) received by PID 1660 (TID 0x7f8a4fbe9800) from PID 1660; stack trace: ***
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x5618259e3c57 google::(anonymous namespace)::FailureSignalHandler()
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x7f8a4fe1b3c0 (unknown)
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x7f8a4fc3418b gsignal
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x7f8a4fc13859 abort
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x5618255686f9 _ZN3ray6RayLogD2Ev.cold
(pid=raylet) [2021-01-05 11:57:40,120 E 1660 1660] logging.cc:414:     @     0x5618255eaecc ray::raylet::WorkerPool::GetStateForLanguage()
(pid=raylet) [2021-01-05 11:57:40,121 E 1660 1660] logging.cc:414:     @     0x5618255f05c2 ray::raylet::WorkerPool::PrestartWorkers()
(pid=raylet) [2021-01-05 11:57:40,121 E 1660 1660] logging.cc:414:     @     0x56182564c3bc ray::raylet::NodeManager::HandleRequestWorkerLease()
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x5618255bc116 _ZN5boost4asio6detail18completion_handlerIZN3ray3rpc14ServerCallImplINS4_25NodeManagerServiceHandlerENS4_25RequestWorkerLeaseRequestENS4_23RequestWorkerLeaseReplyEE13HandleRequestEvEUlvE_E11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x561825d5bd11 boost::asio::detail::scheduler::do_run_one()
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x561825d5d451 boost::asio::detail::scheduler::run()
(pid=raylet) [2021-01-05 11:57:40,122 E 1660 1660] logging.cc:414:     @     0x561825d5facb boost::asio::io_context::run()
(pid=raylet) [2021-01-05 11:57:40,123 E 1660 1660] logging.cc:414:     @     0x561825582918 main
(pid=raylet) [2021-01-05 11:57:40,123 E 1660 1660] logging.cc:414:     @     0x7f8a4fc150b3 __libc_start_main
(pid=raylet) [2021-01-05 11:57:40,123 E 1660 1660] logging.cc:414:     @     0x56182559682e _start
Traceback (most recent call last):
  File "test2.py", line 21, in <module>
    ctx = StreamingContext.Builder() \
  File "***/ray/python/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "***/ray/python/ray/streaming/context.py", line 54, in __init__
    self._j_ctx = self._gateway_client.create_streaming_context()
  File "***/ray/python/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
    return deserialize(ray.get(call))
  File "***/ray/python/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "***/ray/python/ray/worker.py", line 1395, in get
    raise value
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task. Check python-core-worker-*.log files for more information.
2021-01-05 11:58:10,073 WARNING worker.py:1044 -- The node with node id b1991ec40afb2a9d984ddc2268153b9e327efbedade137019c81422a has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.

The python-core-worker-*.log only repeat one error metric_exporter.cc:206: Export metrics to agent failed: IOError: 14: failed to connect to all addresses. This won't affect Ray, but you can lose metrics from the cluster. I try to add all streaming jar to the sys.path, but can not change anything.

@chaokunyang Has talked to me today offline that ray streaming uses dynamic resource feture for operator vertex scheduling but dynamic resource is deprecated now in master branch.

ashione commented 3 years ago

BTW, set RAY_ENABLE_NEW_SCHEDULER=0 in env vars maybe work. We'll remove dynamic resource setting from streaming scheduler later.

DoraTiger commented 3 years ago

@ashione Thanks, after set RAY_ENABLE_NEW_SCHEDULER=0 in env, use streaming in java works correctly. But use streaming in python still get the same error. Here is the code, maybe I have some wrong config? I put all ray jars on the lib dir.

import os
import sys
import ray
from ray.streaming import StreamingContext
from ray.streaming.config import Config
import time
import logging

if __name__ == "__main__":
    # Get program parameters
    sys.path.append("/***/lib")
    ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))

    ctx = StreamingContext.Builder() \
        .build()
    # .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
    # A Ray streaming environment with the default configuration
    ctx.set_parallelism(1)  # Each operator will be executed by two actors
    ctx.from_values("a", "b", "c", "c") \
        .set_parallelism(1) \
        .flat_map(lambda x: [x, x]) \
        .map(lambda x: (x, 1)) \
        .key_by(lambda x: x[0]) \
        .reduce(lambda old_value, new_value:
                (old_value[0], old_value[1] + new_value[1])) \
        .sink(lambda x: print("result", x))
    ctx.submit("word_count")
blublinsky commented 3 years ago

Was this ever fixed. It looks like sample https://github.com/ray-project/ray/blob/master/streaming/python/examples/wordcount.py is still broken. When running it I am getting:

2021-08-28 20:01:27,586 INFO services.py:1265 -- View the Ray dashboard at http://127.0.0.1:8266
(raylet) WARNING: An illegal reflective access operation has occurred
(raylet) WARNING: Illegal reflective access by org.nustaq.serialization.FSTClazzInfo (file:/usr/local/lib/python3.7/site-packages/ray/jars/ray_dist.jar) to field java.lang.String.value
(raylet) WARNING: Please consider reporting this to the maintainers of org.nustaq.serialization.FSTClazzInfo
(raylet) WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
(raylet) WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
  File "/Users/boris/Projects/scalingpythonml/streaming/ray_streaming/wordcount.py ", line 73, in <module>
    .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
  File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 54, in __init__
    self._j_ctx = self._gateway_client.create_streaming_context()
  File "/usr/local/lib/python3.7/site-packages/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
    return deserialize(ray.get(call))
  File "/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 89, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray/worker.py", line 1631, in get
    raise value
ray.exceptions.CrossLanguageError: An exception raised from JAVA:
io.ray.runtime.exception.RayActorException: (pid=5651, ip=192.168.0.2) The actor died because of it's creation task failed
    at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:216)
    at io.ray.runtime.RayNativeRuntime.nativeRunTaskExecutor(Native Method)
    at io.ray.runtime.RayNativeRuntime.run(RayNativeRuntime.java:225)
    at io.ray.runtime.runner.worker.DefaultWorker.main(DefaultWorker.java:15)
Caused by: java.lang.RuntimeException: Failed to load functions from class io.ray.streaming.runtime.python.PythonGateway
    at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:240)
    at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.getFunction(FunctionManager.java:173)
    at io.ray.runtime.functionmanager.FunctionManager.getFunction(FunctionManager.java:108)
    at io.ray.runtime.task.TaskExecutor.getRayFunction(TaskExecutor.java:69)
    at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:121)
    ... 3 more
Caused by: java.lang.ClassNotFoundException: io.ray.streaming.runtime.python.PythonGateway
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:435)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:468)
    at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:199)
    ... 7 more
ashione commented 3 years ago

Was this ever fixed. It looks like sample https://github.com/ray-project/ray/blob/master/streaming/python/examples/wordcount.py is still broken. When running it I am getting:


2021-08-28 20:01:27,586   INFO services.py:1265 -- View the Ray dashboard at http://127.0.0.1:8266

(raylet) WARNING: An illegal reflective access operation has occurred

(raylet) WARNING: Illegal reflective access by org.nustaq.serialization.FSTClazzInfo (file:/usr/local/lib/python3.7/site-packages/ray/jars/ray_dist.jar) to field java.lang.String.value

(raylet) WARNING: Please consider reporting this to the maintainers of org.nustaq.serialization.FSTClazzInfo

(raylet) WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

(raylet) WARNING: All illegal access operations will be denied in a future release

Traceback (most recent call last):

  File "/Users/boris/Projects/scalingpythonml/streaming/ray_streaming/wordcount.py ", line 73, in <module>

    .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \

  File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 48, in build

    ctx = StreamingContext()

  File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 54, in __init__

    self._j_ctx = self._gateway_client.create_streaming_context()

  File "/usr/local/lib/python3.7/site-packages/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context

    return deserialize(ray.get(call))

  File "/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 89, in wrapper

    return func(*args, **kwargs)

  File "/usr/local/lib/python3.7/site-packages/ray/worker.py", line 1631, in get

    raise value

ray.exceptions.CrossLanguageError: An exception raised from JAVA:

io.ray.runtime.exception.RayActorException: (pid=5651, ip=192.168.0.2) The actor died because of it's creation task failed

  at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:216)

  at io.ray.runtime.RayNativeRuntime.nativeRunTaskExecutor(Native Method)

  at io.ray.runtime.RayNativeRuntime.run(RayNativeRuntime.java:225)

  at io.ray.runtime.runner.worker.DefaultWorker.main(DefaultWorker.java:15)

Caused by: java.lang.RuntimeException: Failed to load functions from class io.ray.streaming.runtime.python.PythonGateway

  at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:240)

  at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.getFunction(FunctionManager.java:173)

  at io.ray.runtime.functionmanager.FunctionManager.getFunction(FunctionManager.java:108)

  at io.ray.runtime.task.TaskExecutor.getRayFunction(TaskExecutor.java:69)

  at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:121)

  ... 3 more

Caused by: java.lang.ClassNotFoundException: io.ray.streaming.runtime.python.PythonGateway

  at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:435)

  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)

  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

  at java.base/java.lang.Class.forName0(Native Method)

  at java.base/java.lang.Class.forName(Class.java:468)

  at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:199)

  ... 7 more

These exception trace back information lines show the root cause is that streaming jar can not be found in your classpath. Could you mind check it again?

blublinsky commented 3 years ago

What exactly am I checking? I have the latest version of Ray pip installed. Do I need to install anything else to make it work. Also can it be related to the fact that I am running Java 15, not 8 on my box?

java --version
openjdk 15.0.2 2021-01-19
OpenJDK Runtime Environment Corretto-15.0.2.7.1 (build 15.0.2+7)
OpenJDK 64-Bit Server VM Corretto-15.0.2.7.1 (build 15.0.2+7, mixed mode, sharing)

Also it looks like replacing ray.init() with ray.init(ignore_reinit_error=True, job_config=JobConfig(code_search_path=sys.path)) breaks any applications that are using actors.

ashione commented 3 years ago

What exactly am I checking? I have the latest version of Ray pip installed. Do I need to install anything else to make it work. Also can it be related to the fact that I am running Java 15, not 8 on my box?

java --version
openjdk 15.0.2 2021-01-19
OpenJDK Runtime Environment Corretto-15.0.2.7.1 (build 15.0.2+7)
OpenJDK 64-Bit Server VM Corretto-15.0.2.7.1 (build 15.0.2+7, mixed mode, sharing)

Also it looks like replacing ray.init() with ray.init(ignore_reinit_error=True, job_config=JobConfig(code_search_path=sys.path)) breaks any applications that are using actors.

It works for me in my linux machine. You may find class PythonGateway binary code in $YOUR_PY_ENV/lib/python3.6/site-packages/ray/jars/ray_dist.jar in ray 1.6 version (lastest).

If you want to run it in master branch, extra jar should be built manually.

ashione commented 3 years ago

@jovany-wang please check the protocol of ray 1.6 jar
image

ashione commented 3 years ago

What exactly am I checking? I have the latest version of Ray pip installed. Do I need to install anything else to make it work.

Also can it be related to the fact that I am running Java 15, not 8 on my box?


java --version

openjdk 15.0.2 2021-01-19

OpenJDK Runtime Environment Corretto-15.0.2.7.1 (build 15.0.2+7)

OpenJDK 64-Bit Server VM Corretto-15.0.2.7.1 (build 15.0.2+7, mixed mode, sharing)

Also it looks like replacing ray.init() with ray.init(ignore_reinit_error=True, job_config=JobConfig(code_search_path=sys.path)) breaks any applications that are using actors.

this pr will be helpful. https://github.com/ray-project/ray/pull/18490

ashione commented 3 years ago

@blublinsky @Devil-Z Could you run examples in lastest master?

jovany-wang commented 1 year ago

It was moved to https://github.com/ray-project/mobius