Open jamie0xgitc0decat opened 2 years ago
cc @iycheng It seems like this requires very big payload for InternalKVPut.
@jamie0xgitc0decat
172 def flush_values(self):
173 for (category, key), value in self._to_flush.items():
--> 174 _internal_kv_put(
175 _make_key(self._prefix, category, key), value, overwrite=True)
176 self._to_flush.clear()
Do you happen to know what value do you put here? We have the default gRPC limit for Put object.
cc @krfricke @amogkam I think if the value is too big, application layer probably should split the data into smaller chunks. Alternatively, we can support it from Ray core, but Idk if that's something we want to support.
@rkooo567 The type of value is Byte. The key is my env.
The default MAX_RECEIVE_MESSAGE_LENGTH is 512MB which is placed in the ray/_private/gcs_utils.py https://github.com/ray-project/ray/blob/7f1bacc7dc9caf6d0ec042e39499bbf1d9a7d065/python/ray/_private/gcs_utils.py
Line 74
_MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
https://github.com/ray-project/ray/blob/7f1bacc7dc9caf6d0ec042e39499bbf1d9a7d065/python/ray/ray_constants.py
Line 312
GRPC_CPP_MAX_MESSAGE_SIZE = 100 * 1024 * 1024
https://github.com/ray-project/ray/blob/7f1bacc7dc9caf6d0ec042e39499bbf1d9a7d065/python/ray/internal/internal_api.py
line 12
MAX_MESSAGE_LENGTH = ray._config.max_grpc_message_size()
I changed all these variables to 1024 * 1024 * 1024
which is 1GB, but it looks like stuck in a default value of 100MB.
---------------------------------------------------------------------------
_InactiveRpcError Traceback (most recent call last)
Input In [43], in <module>
10 #ray.init(dashboard_port=9881)
11 ray.shutdown()
---> 12 ray.init(dashboard_host="0.0.0.0",dashboard_port=9881,include_dashboard=True)
File /usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File /usr/local/lib/python3.8/dist-packages/ray/worker.py:985, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, _enable_object_reconstruction, _redis_max_memory, _plasma_directory, _node_ip_address, _driver_object_store_memory, _memory, _redis_password, _temp_dir, _metrics_export_port, _system_config, _tracing_startup_hook, **kwargs)
982 global_worker.set_load_code_from_local(False)
984 for hook in _post_init_hooks:
--> 985 hook()
987 node_id = global_worker.core_worker.get_current_node_id()
988 return dict(_global_node.address_info, node_id=node_id.hex())
File /usr/local/lib/python3.8/dist-packages/ray/tune/registry.py:180, in _Registry.flush_values(self)
178 f.write(value)
179 #f.write("\n")
--> 180 _internal_kv_put(
181 _make_key(self._prefix, category, key), value, overwrite=True)
182 self._to_flush.clear()
File /usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File /usr/local/lib/python3.8/dist-packages/ray/experimental/internal_kv.py:78, in _internal_kv_put(key, value, overwrite, namespace)
75 value = value.encode()
76 assert isinstance(key, bytes) and isinstance(value, bytes) and isinstance(
77 overwrite, bool)
---> 78 return global_gcs_client.internal_kv_put(key, value, overwrite,
79 namespace) == 0
File /usr/local/lib/python3.8/dist-packages/ray/_private/gcs_utils.py:136, in _auto_reconnect.<locals>.wrapper(self, *args, **kwargs)
134 while True:
135 try:
--> 136 return f(self, *args, **kwargs)
137 except grpc.RpcError as e:
138 if remaining_retry <= 0:
File /usr/local/lib/python3.8/dist-packages/ray/_private/gcs_utils.py:258, in GcsClient.internal_kv_put(self, key, value, overwrite, namespace)
255 key = _make_key(namespace, key)
256 req = gcs_service_pb2.InternalKVPutRequest(
257 key=key, value=value, overwrite=overwrite)
--> 258 reply = self._kv_stub.InternalKVPut(req)
259 if reply.status.code == GcsCode.OK:
260 return reply.added_num
File /usr/local/lib/python3.8/dist-packages/grpc/_channel.py:946, in _UnaryUnaryMultiCallable.__call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
937 def __call__(self,
938 request,
939 timeout=None,
(...)
942 wait_for_ready=None,
943 compression=None):
944 state, call, = self._blocking(request, timeout, metadata, credentials,
945 wait_for_ready, compression)
--> 946 return _end_unary_response_blocking(state, call, False, None)
File /usr/local/lib/python3.8/dist-packages/grpc/_channel.py:849, in _end_unary_response_blocking(state, call, with_call, deadline)
847 return state.response
848 else:
--> 849 raise _InactiveRpcError(state)
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Received message larger than max (776275415 vs. 104857600)"
debug_error_string = "{"created":"@1645553262.811232077","description":"Error received from peer ipv4:172.17.0.2:45387","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Received message larger than max (776275415 vs. 104857600)","grpc_status":8}"
>
On the file ray/_private/gcs_utils.py
_MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
It is pretty weird. It displays the max size is 100MB no matter what I set, except I set it to 512MB. It will display 512MB.
It raises up another error gcs_client is None
when I set it to 2GB.
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
Input In [45], in <module>
10 #ray.init(dashboard_port=9881)
11 ray.shutdown()
---> 12 ray.init(dashboard_host="0.0.0.0",dashboard_port=9881,include_dashboard=True)
File /usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File /usr/local/lib/python3.8/dist-packages/ray/worker.py:918, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, _enable_object_reconstruction, _redis_max_memory, _plasma_directory, _node_ip_address, _driver_object_store_memory, _memory, _redis_password, _temp_dir, _metrics_export_port, _system_config, _tracing_startup_hook, **kwargs)
878 ray_params = ray._private.parameter.RayParams(
879 redis_address=redis_address,
880 node_ip_address=node_ip_address,
(...)
912 metrics_export_port=_metrics_export_port,
913 tracing_startup_hook=_tracing_startup_hook)
914 # Start the Ray processes. We set shutdown_at_exit=False because we
915 # shutdown the node in the ray.shutdown call that happens in the atexit
916 # handler. We still spawn a reaper process in case the atexit handler
917 # isn't called.
--> 918 _global_node = ray.node.Node(
919 head=True,
920 shutdown_at_exit=False,
921 spawn_reaper=True,
922 ray_params=ray_params)
923 else:
924 # In this case, we are connecting to an existing cluster.
925 if num_cpus is not None or num_gpus is not None:
File /usr/local/lib/python3.8/dist-packages/ray/node.py:215, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
213 # Start processes.
214 if head:
--> 215 self.start_head_processes()
216 # Make sure gcs is up
217 self.get_gcs_client().internal_kv_put(
218 b"session_name", self.session_name.encode(), True,
219 ray_constants.KV_NAMESPACE_SESSION)
File /usr/local/lib/python3.8/dist-packages/ray/node.py:913, in Node.start_head_processes(self)
910 # If this is the head node, start the relevant head node processes.
911 self.start_redis()
--> 913 self.start_gcs_server()
914 if not self._ray_params.no_monitor:
915 self.start_monitor()
File /usr/local/lib/python3.8/dist-packages/ray/node.py:799, in Node.start_gcs_server(self)
795 self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
796 process_info,
797 ]
798 # Init gcs client
--> 799 self.get_gcs_client()
File /usr/local/lib/python3.8/dist-packages/ray/node.py:472, in Node.get_gcs_client(self)
470 time.sleep(1)
471 logger.debug(f"Waiting for gcs up {e}")
--> 472 ray.experimental.internal_kv._initialize_internal_kv(
473 self._gcs_client)
475 return self._gcs_client
File /usr/local/lib/python3.8/dist-packages/ray/experimental/internal_kv.py:24, in _initialize_internal_kv(gcs_client)
21 """Initialize the internal KV for use in other function calls.
22 """
23 global global_gcs_client, _initialized
---> 24 assert gcs_client is not None
25 global_gcs_client = gcs_client
26 _initialized = True
AssertionError:
Hmm it does sound odd. My understanding is also that changing gcs_utils.py should fix the issue.
Is there an easy way to reproduce this? I can take a look at it
Can you link the Tune script that you use? Sometimes RESOURCE_EXHAUSTED
message shows up when a big blob of data is serialized together with train_function. Just want to rule that out for your case first.
This isn't technically a bug--- creating big environments like this is a bit of an anti-pattern. A couple alternatives here are to either:
Either way Tune/RLlib could probably raise a better message here, such as your env class is too large.
Hi I'm having the same error. I'm using Ray tune with Ax on the CIFAR-10 dataset. I'm following along the docs for both Ax and Raytune.
The error that I get is:
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Received message larger than max (184620808 vs. 104857600)"
debug_error_string = "{"created":"@1649857255.358909155","description":"Error received from peer ipv4:172.28.0.2:63706","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"Received message larger than max (184620808 vs. 104857600)","grpc_status":8}"
>
The code that I'm using is:
ax = AxClient(enforce_sequential_optimization=False)
def init_net(model):
net = model
return net # return untrained model
def training_net(model, parameters, loader):
dev = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(dev)
epochs = parameters.get("epoch", 30)
# loss and optimizer
criterion = nn.CrossEntropyLoss()
if parameters.get("optimizer", "SGD"):
optimizer = optim.SGD(model.parameters(), lr=parameters.get("lr", 0.001), momentum=parameters.get("momentum", 0.9))
if parameters.get("optimizer", "Adam"):
optimizer = optim.Adam(model.parameters(), lr=parameters.get("lr", 0.001))
for _ in range(epochs):
model.train() # loop over the dataset multiple times
for inputs, labels in loader:
# get the inputs
# inputs, labels = data
inputs = inputs.clone().detach().to(dev)
labels = labels.clone().detach().to(dev)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
return model
def evaluate_net(model, loader, dev):
# Function that evaluates the model and return the desired metric to optimize. Copied over from run_dn_image_es()
model.eval()
first = True
prob_cal = nn.Softmax(dim=1)
test_preds = []
test_labels = []
with torch.no_grad():
for images, labels in loader:
images = images.clone().detach().to(dev)
labels = labels.clone().detach().to(dev)
test_labels = np.concatenate((test_labels, labels.tolist()))
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
test_preds = np.concatenate((test_preds, predicted.tolist()))
test_prob = prob_cal(outputs)
if first:
test_probs = test_prob.tolist()
first = False
else:
test_probs = np.concatenate((test_probs, test_prob.tolist()))
return (
cohen_kappa_score(test_preds, test_labels)
)
def train_evaluate(parameterization):
# Ax primitive that initializes the training sequence --> Trains the model --> Calculates the evaluation metric
untrained_net = init_net(model)
trained_net = training_net(untrained_net, parameterization, train_loader)
report(
cohen_kappa = evaluate_net(model, valid_loader, device)
)
ax.create_experiment(
name="cifar10_experiment",
parameters=[
{"name": "lr", "type": "range", "bounds": [1e-6, 0.4],"log_scale": True},
{"name": "momentum", "type": "range", "bounds": [0.0, 1.0]},
{"name": "epoch", "type": "range", "bounds": [15, 40]},
{"name": "optimizer", "type": "choice", "values": ["SGD", "Adam"]}],
objective_name="cohen_kappa",
minimize=False)
asha_scheduler = ASHAScheduler(
max_t=10,
grace_period=5,
reduction_factor=2)
algo = AxSearch(ax_client = ax)
# Wrap AxSearcher in a concurrently limiter, to ensure that Bayesian optimization receives the
# data for completed trials before creating more trials
algo = tune.suggest.ConcurrencyLimiter(algo, max_concurrent=3)
tune.run(
tune.with_parameters(train_evaluate),
num_samples=5,
metric="cohen_kappa",
mode="max",
search_alg=algo,
verbose=0, # Set this level to 1 to see status updates and to 2 to also see trial results.
scheduler=asha_scheduler # To use GPU, specify:
#resources_per_trial={"gpu": 1, "cpu": 4}
)
Would really appreciate help on how to solve this.
Hi @adwaykanhere,
This usually happens when some data structure is implicitly captured in the train_func
(train_evaluate
in your case). For example, is model
implicitly captured here and would that cause size issue when shipping over grpc?
One way to go about this is to load the model from a file within the train_func
.
@rkooo567 The type of value is Byte. The key is my env.
The default MAX_RECEIVE_MESSAGE_LENGTH is 512MB which is placed in the ray/_private/gcs_utils.py https://github.com/ray-project/ray/blob/7f1bacc7dc9caf6d0ec042e39499bbf1d9a7d065/python/ray/_private/gcs_utils.py
Line 74
_MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
https://github.com/ray-project/ray/blob/7f1bacc7dc9caf6d0ec042e39499bbf1d9a7d065/python/ray/ray_constants.py Line 312
GRPC_CPP_MAX_MESSAGE_SIZE = 100 * 1024 * 1024
https://github.com/ray-project/ray/blob/7f1bacc7dc9caf6d0ec042e39499bbf1d9a7d065/python/ray/internal/internal_api.py line 12
MAX_MESSAGE_LENGTH = ray._config.max_grpc_message_size()
I changed all these variables to
1024 * 1024 * 1024
which is 1GB, but it looks like stuck in a default value of 100MB.--------------------------------------------------------------------------- _InactiveRpcError Traceback (most recent call last) Input In [43], in <module> 10 #ray.init(dashboard_port=9881) 11 ray.shutdown() ---> 12 ray.init(dashboard_host="0.0.0.0",dashboard_port=9881,include_dashboard=True) File /usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs) 103 if func.__name__ != "init" or is_client_mode_enabled_by_default: 104 return getattr(ray, func.__name__)(*args, **kwargs) --> 105 return func(*args, **kwargs) File /usr/local/lib/python3.8/dist-packages/ray/worker.py:985, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, _enable_object_reconstruction, _redis_max_memory, _plasma_directory, _node_ip_address, _driver_object_store_memory, _memory, _redis_password, _temp_dir, _metrics_export_port, _system_config, _tracing_startup_hook, **kwargs) 982 global_worker.set_load_code_from_local(False) 984 for hook in _post_init_hooks: --> 985 hook() 987 node_id = global_worker.core_worker.get_current_node_id() 988 return dict(_global_node.address_info, node_id=node_id.hex()) File /usr/local/lib/python3.8/dist-packages/ray/tune/registry.py:180, in _Registry.flush_values(self) 178 f.write(value) 179 #f.write("\n") --> 180 _internal_kv_put( 181 _make_key(self._prefix, category, key), value, overwrite=True) 182 self._to_flush.clear() File /usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs) 103 if func.__name__ != "init" or is_client_mode_enabled_by_default: 104 return getattr(ray, func.__name__)(*args, **kwargs) --> 105 return func(*args, **kwargs) File /usr/local/lib/python3.8/dist-packages/ray/experimental/internal_kv.py:78, in _internal_kv_put(key, value, overwrite, namespace) 75 value = value.encode() 76 assert isinstance(key, bytes) and isinstance(value, bytes) and isinstance( 77 overwrite, bool) ---> 78 return global_gcs_client.internal_kv_put(key, value, overwrite, 79 namespace) == 0 File /usr/local/lib/python3.8/dist-packages/ray/_private/gcs_utils.py:136, in _auto_reconnect.<locals>.wrapper(self, *args, **kwargs) 134 while True: 135 try: --> 136 return f(self, *args, **kwargs) 137 except grpc.RpcError as e: 138 if remaining_retry <= 0: File /usr/local/lib/python3.8/dist-packages/ray/_private/gcs_utils.py:258, in GcsClient.internal_kv_put(self, key, value, overwrite, namespace) 255 key = _make_key(namespace, key) 256 req = gcs_service_pb2.InternalKVPutRequest( 257 key=key, value=value, overwrite=overwrite) --> 258 reply = self._kv_stub.InternalKVPut(req) 259 if reply.status.code == GcsCode.OK: 260 return reply.added_num File /usr/local/lib/python3.8/dist-packages/grpc/_channel.py:946, in _UnaryUnaryMultiCallable.__call__(self, request, timeout, metadata, credentials, wait_for_ready, compression) 937 def __call__(self, 938 request, 939 timeout=None, (...) 942 wait_for_ready=None, 943 compression=None): 944 state, call, = self._blocking(request, timeout, metadata, credentials, 945 wait_for_ready, compression) --> 946 return _end_unary_response_blocking(state, call, False, None) File /usr/local/lib/python3.8/dist-packages/grpc/_channel.py:849, in _end_unary_response_blocking(state, call, with_call, deadline) 847 return state.response 848 else: --> 849 raise _InactiveRpcError(state) _InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.RESOURCE_EXHAUSTED details = "Received message larger than max (776275415 vs. 104857600)" debug_error_string = "{"created":"@1645553262.811232077","description":"Error received from peer ipv4:172.17.0.2:45387","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Received message larger than max (776275415 vs. 104857600)","grpc_status":8}" >
On the file
ray/_private/gcs_utils.py
_MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
It is pretty weird. It displays the max size is 100MB no matter what I set, except I set it to 512MB. It will display 512MB. It raises up another error
gcs_client is None
when I set it to 2GB.--------------------------------------------------------------------------- AssertionError Traceback (most recent call last) Input In [45], in <module> 10 #ray.init(dashboard_port=9881) 11 ray.shutdown() ---> 12 ray.init(dashboard_host="0.0.0.0",dashboard_port=9881,include_dashboard=True) File /usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs) 103 if func.__name__ != "init" or is_client_mode_enabled_by_default: 104 return getattr(ray, func.__name__)(*args, **kwargs) --> 105 return func(*args, **kwargs) File /usr/local/lib/python3.8/dist-packages/ray/worker.py:918, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, _enable_object_reconstruction, _redis_max_memory, _plasma_directory, _node_ip_address, _driver_object_store_memory, _memory, _redis_password, _temp_dir, _metrics_export_port, _system_config, _tracing_startup_hook, **kwargs) 878 ray_params = ray._private.parameter.RayParams( 879 redis_address=redis_address, 880 node_ip_address=node_ip_address, (...) 912 metrics_export_port=_metrics_export_port, 913 tracing_startup_hook=_tracing_startup_hook) 914 # Start the Ray processes. We set shutdown_at_exit=False because we 915 # shutdown the node in the ray.shutdown call that happens in the atexit 916 # handler. We still spawn a reaper process in case the atexit handler 917 # isn't called. --> 918 _global_node = ray.node.Node( 919 head=True, 920 shutdown_at_exit=False, 921 spawn_reaper=True, 922 ray_params=ray_params) 923 else: 924 # In this case, we are connecting to an existing cluster. 925 if num_cpus is not None or num_gpus is not None: File /usr/local/lib/python3.8/dist-packages/ray/node.py:215, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only) 213 # Start processes. 214 if head: --> 215 self.start_head_processes() 216 # Make sure gcs is up 217 self.get_gcs_client().internal_kv_put( 218 b"session_name", self.session_name.encode(), True, 219 ray_constants.KV_NAMESPACE_SESSION) File /usr/local/lib/python3.8/dist-packages/ray/node.py:913, in Node.start_head_processes(self) 910 # If this is the head node, start the relevant head node processes. 911 self.start_redis() --> 913 self.start_gcs_server() 914 if not self._ray_params.no_monitor: 915 self.start_monitor() File /usr/local/lib/python3.8/dist-packages/ray/node.py:799, in Node.start_gcs_server(self) 795 self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [ 796 process_info, 797 ] 798 # Init gcs client --> 799 self.get_gcs_client() File /usr/local/lib/python3.8/dist-packages/ray/node.py:472, in Node.get_gcs_client(self) 470 time.sleep(1) 471 logger.debug(f"Waiting for gcs up {e}") --> 472 ray.experimental.internal_kv._initialize_internal_kv( 473 self._gcs_client) 475 return self._gcs_client File /usr/local/lib/python3.8/dist-packages/ray/experimental/internal_kv.py:24, in _initialize_internal_kv(gcs_client) 21 """Initialize the internal KV for use in other function calls. 22 """ 23 global global_gcs_client, _initialized ---> 24 assert gcs_client is not None 25 global_gcs_client = gcs_client 26 _initialized = True AssertionError:
Exactly the same problem that I got in these days. Anyone knows how to increase the limit?
Search before asking
Ray Component
Ray Core, Grpc
What happened + What you expected to happen
Versions / Dependencies
Ray : 1.10.0 Python: 3.8 OS: Ubuntu 20.04
Reproduction script
Anything else
This happens when the shape of the input feed is (928200, 91), when it has 91 columns and 928200 rows. However, all things are fine when I use a small-size data feed.
https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h I had set the environment variable to -1 (unlimited), but it does not work.
GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH=-1
Are you willing to submit a PR?