Open mgierada opened 5 months ago
@mgierada I just tried it again on Ray master and it has no issue. The only difference in the dependencies is I'm on protobuf==4.21.12
so not sure if that's the cause.
Otherwise you can probably try those files generated from my version to see if it solves the issue
#### user_defined_protos_pb2.py ####
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: user_defined_protos.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x19user_defined_protos.proto\x12\x11userdefinedprotos\"?\n\x12UserDefinedMessage\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06origin\x18\x02 \x01(\t\x12\x0b\n\x03num\x18\x03 \x01(\x03\"4\n\x13UserDefinedResponse\x12\x10\n\x08greeting\x18\x01 \x01(\t\x12\x0b\n\x03num\x18\x02 \x01(\x03\"\x15\n\x13UserDefinedMessage2\"(\n\x14UserDefinedResponse2\x12\x10\n\x08greeting\x18\x01 \x01(\t\"*\n\tImageData\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\x10\n\x08\x66ilename\x18\x02 \x01(\t\"4\n\nImageClass\x12\x0f\n\x07\x63lasses\x18\x01 \x03(\t\x12\x15\n\rprobabilities\x18\x02 \x03(\x02\x32\xae\x02\n\x12UserDefinedService\x12Y\n\x08__call__\x12%.userdefinedprotos.UserDefinedMessage\x1a&.userdefinedprotos.UserDefinedResponse\x12_\n\x0cMultiplexing\x12&.userdefinedprotos.UserDefinedMessage2\x1a\'.userdefinedprotos.UserDefinedResponse2\x12\\\n\tStreaming\x12%.userdefinedprotos.UserDefinedMessage\x1a&.userdefinedprotos.UserDefinedResponse0\x01\x32\x64\n\x1aImageClassificationService\x12\x46\n\x07Predict\x12\x1c.userdefinedprotos.ImageData\x1a\x1d.userdefinedprotos.ImageClassB:\n#io.ray.examples.user_defined_protosB\x11UserDefinedProtosP\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'user_defined_protos_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n#io.ray.examples.user_defined_protosB\021UserDefinedProtosP\001'
_USERDEFINEDMESSAGE._serialized_start=48
_USERDEFINEDMESSAGE._serialized_end=111
_USERDEFINEDRESPONSE._serialized_start=113
_USERDEFINEDRESPONSE._serialized_end=165
_USERDEFINEDMESSAGE2._serialized_start=167
_USERDEFINEDMESSAGE2._serialized_end=188
_USERDEFINEDRESPONSE2._serialized_start=190
_USERDEFINEDRESPONSE2._serialized_end=230
_IMAGEDATA._serialized_start=232
_IMAGEDATA._serialized_end=274
_IMAGECLASS._serialized_start=276
_IMAGECLASS._serialized_end=328
_USERDEFINEDSERVICE._serialized_start=331
_USERDEFINEDSERVICE._serialized_end=633
_IMAGECLASSIFICATIONSERVICE._serialized_start=635
_IMAGECLASSIFICATIONSERVICE._serialized_end=735
# @@protoc_insertion_point(module_scope)
#### user_defined_protos_pb2_grpc.py ####
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import user_defined_protos_pb2 as user__defined__protos__pb2
class UserDefinedServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.__call__ = channel.unary_unary(
'/userdefinedprotos.UserDefinedService/__call__',
request_serializer=user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
response_deserializer=user__defined__protos__pb2.UserDefinedResponse.FromString,
)
self.Multiplexing = channel.unary_unary(
'/userdefinedprotos.UserDefinedService/Multiplexing',
request_serializer=user__defined__protos__pb2.UserDefinedMessage2.SerializeToString,
response_deserializer=user__defined__protos__pb2.UserDefinedResponse2.FromString,
)
self.Streaming = channel.unary_stream(
'/userdefinedprotos.UserDefinedService/Streaming',
request_serializer=user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
response_deserializer=user__defined__protos__pb2.UserDefinedResponse.FromString,
)
class UserDefinedServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def __call__(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Multiplexing(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Streaming(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_UserDefinedServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'__call__': grpc.unary_unary_rpc_method_handler(
servicer.__call__,
request_deserializer=user__defined__protos__pb2.UserDefinedMessage.FromString,
response_serializer=user__defined__protos__pb2.UserDefinedResponse.SerializeToString,
),
'Multiplexing': grpc.unary_unary_rpc_method_handler(
servicer.Multiplexing,
request_deserializer=user__defined__protos__pb2.UserDefinedMessage2.FromString,
response_serializer=user__defined__protos__pb2.UserDefinedResponse2.SerializeToString,
),
'Streaming': grpc.unary_stream_rpc_method_handler(
servicer.Streaming,
request_deserializer=user__defined__protos__pb2.UserDefinedMessage.FromString,
response_serializer=user__defined__protos__pb2.UserDefinedResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'userdefinedprotos.UserDefinedService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class UserDefinedService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def __call__(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.UserDefinedService/__call__',
user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
user__defined__protos__pb2.UserDefinedResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Multiplexing(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.UserDefinedService/Multiplexing',
user__defined__protos__pb2.UserDefinedMessage2.SerializeToString,
user__defined__protos__pb2.UserDefinedResponse2.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Streaming(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/userdefinedprotos.UserDefinedService/Streaming',
user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
user__defined__protos__pb2.UserDefinedResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
class ImageClassificationServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Predict = channel.unary_unary(
'/userdefinedprotos.ImageClassificationService/Predict',
request_serializer=user__defined__protos__pb2.ImageData.SerializeToString,
response_deserializer=user__defined__protos__pb2.ImageClass.FromString,
)
class ImageClassificationServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def Predict(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_ImageClassificationServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'Predict': grpc.unary_unary_rpc_method_handler(
servicer.Predict,
request_deserializer=user__defined__protos__pb2.ImageData.FromString,
response_serializer=user__defined__protos__pb2.ImageClass.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'userdefinedprotos.ImageClassificationService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class ImageClassificationService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def Predict(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.ImageClassificationService/Predict',
user__defined__protos__pb2.ImageData.SerializeToString,
user__defined__protos__pb2.ImageClass.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Hey @GeneDer Thanks for taking a look. This is still not working for me. I made sure I am running the same version of protobuf
as you. Also, I copy-paste the compiled code you shared. I am still facing issues with serialization. Is there anything else you do that I might be missing? Any additional conifg/setup? Apparently, ray
is not able to serialize the protobuf objects.
oh actually another difference, I'm running on Python 3.10.12. Maybe try it out. Also my pickle version is 4.0 not sure if that makes a difference.
I tired with python 3.10.12 and with pickle 4.0 @GeneDer but I still got the same error.
Here's what I am running, maybe I missed something obvious.
./src/protos/user_defined_protos.proto
// user_defined_protos.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.ray.examples.user_defined_protos";
option java_outer_classname = "UserDefinedProtos";
package userdefinedprotos;
message UserDefinedMessage {
string name = 1;
string origin = 2;
int64 num = 3;
}
message UserDefinedResponse {
string greeting = 1;
int64 num = 2;
}
message UserDefinedMessage2 {}
message UserDefinedResponse2 {
string greeting = 1;
}
message ImageData {
string url = 1;
string filename = 2;
}
message ImageClass {
repeated string classes = 1;
repeated float probabilities = 2;
}
service UserDefinedService {
rpc __call__(UserDefinedMessage) returns (UserDefinedResponse);
rpc Multiplexing(UserDefinedMessage2) returns (UserDefinedResponse2);
rpc Streaming(UserDefinedMessage) returns (stream UserDefinedResponse);
}
service ImageClassificationService {
rpc Predict(ImageData) returns (ImageClass);
}
python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. ./src/protos/user_defined_protos.proto
ray start --head
serve start \
--grpc-port 9000 \
--grpc-servicer-functions protos.user_defined_protos_pb2_grpc.add_UserDefinedServiceServicer_to_server
That correctly starts the ray cluster.
./src/simple_deploy.py
import time
from typing import Generator
from protos.user_defined_protos_pb2 import (
UserDefinedMessage,
UserDefinedMessage2,
UserDefinedResponse,
UserDefinedResponse2,
)
import ray
from ray import serve
@serve.deployment
class GrpcDeployment:
def __call__(self, user_message: UserDefinedMessage) -> UserDefinedResponse:
greeting = f"Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
return user_response
@serve.multiplexed(max_num_models_per_replica=1)
async def get_model(self, model_id: str) -> str:
return f"loading model: {model_id}"
async def Multiplexing(self, user_message: UserDefinedMessage2) -> UserDefinedResponse2:
model_id = serve.get_multiplexed_model_id()
model = await self.get_model(model_id)
user_response = UserDefinedResponse2(
greeting=f"Method2 called model, {model}",
)
return user_response
def Streaming(
self, user_message: UserDefinedMessage
) -> Generator[UserDefinedResponse, None, None]:
for i in range(10):
greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2 + i
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
yield user_response
time.sleep(0.1)
g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
python ./src/simple_deploy
and got the error from the issue description aboveWhat if you don't put those in a sub-directory and instead just generate in the same root?
Nah, unfortunately that is not helping much,
Oh I finally solved that issue. Docs are very misleading on this. The cause was that my proto imports were out of scope. Moving them close to the place they are used made a difference. So instead of importing on top of the file like this
import time
from typing import Generator
from protos.user_defined_protos_pb2 import (
UserDefinedMessage,
UserDefinedMessage2,
UserDefinedResponse,
UserDefinedResponse2,
)
import ray
from ray import serve
# ... ray serve logic
do this
# file name ./src/deploy.py
import time
from typing import Generator
from ray import serve
@serve.deployment
class GrpcDeployment:
def __call__(self, user_message):
from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse
greeting = f"Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
return user_response
@serve.multiplexed(max_num_models_per_replica=1)
async def get_model(self, model_id: str) -> str:
return f"loading model: {model_id}"
async def Multiplexing(self, user_message):
from user_defined_protos_pb2 import UserDefinedMessage2, UserDefinedResponse2
model_id = serve.get_multiplexed_model_id()
model = await self.get_model(model_id)
user_response = UserDefinedResponse2(
greeting=f"Method2 called model, {model}",
)
return user_response
def Streaming(self, user_message) -> Generator:
from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse
for i in range(10):
greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2 + i
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
yield user_response
time.sleep(0.1)
g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
Then, assuming your ray is available on the localhost or whatever, deploy by running the file
python src/deploy.py
The following PR fixes the docs:
@GeneDer Okay I agree that everything works fine if I keep it in the root of the Ray's working directory. The current PR #45862 does not make sense in that context. However, the issue appears when I organize the logic in subdirectories, e.g.
.
├── __init__.py
├── config.yaml
├── playground
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-311.pyc
│ │ ├── deployment.cpython-311.pyc
│ │ ├── user_defined_protos_pb2.cpython-311.pyc
│ │ └── user_defined_protos_pb2_grpc.cpython-311.pyc
│ ├── deployment.py
│ ├── user_defined_protos.proto
│ ├── user_defined_protos_pb2.py
│ └── user_defined_protos_pb2_grpc.py
├──
With the config.yaml
looking like this
grpc_options:
port: 9000
grpc_servicer_functions:
- playground.user_defined_protos_pb2_grpc.add_UserDefinedServiceServicer_to_server
applications:
- name: app1
route_prefix: /app1
import_path: playground.deployment:g
runtime_env: {}
@GeneDer Ohh... This is even worst then I initially thought. Seems that ray[serve]
can only work with plain project structure as the one you shared. Also, there seem to be a deal breaker for my use case as ray[serve]
is not able to work with protos defined as a library.
Let's assume I have a private repo which builds a proto library. I can access that library e.g
from private_proto_library.protos.user_defined_protos_pb2 import (
UserDefinedMessage,
UserDefinedMessage2,
UserDefinedResponse,
UserDefinedResponse2)
So assuming I have the following deployment.py
# file name ./src/deployment.py
import time
from typing import Generator
from ray import serve
from private_proto_library.protos.user_defined_protos_pb2 import (
UserDefinedMessage,
UserDefinedMessage2,
UserDefinedResponse,
UserDefinedResponse2)
@serve.deployment
class GrpcDeployment:
def __call__(self, user_message):
greeting = f"Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
return user_response
@serve.multiplexed(max_num_models_per_replica=1)
async def get_model(self, model_id: str) -> str:
return f"loading model: {model_id}"
async def Multiplexing(self, user_message):
model_id = serve.get_multiplexed_model_id()
model = await self.get_model(model_id)
user_response = UserDefinedResponse2(
greeting=f"Method2 called model, {model}",
)
return user_response
def Streaming(self, user_message) -> Generator:
for i in range(10):
greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2 + i
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
yield user_response
time.sleep(0.1)
g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
I can deploy the model using config.yml
grpc_options:
port: 9000
grpc_servicer_functions:
- private_proto_library.protos.user_defined_protos_pb2_grpc.add_StressTestingServiceServicer_to_server
applications:
- name: app1
route_prefix: /app1
import_path: src.deployment:g
runtime_env: {}
I got the sort of similar error as the one I posted at the very top of this issue saying
(ServeController pid=74965) !!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
Are there any recommendation to use private protos libraries or to have a little bit more complex, and at the same time more meanigful, structure of the project directories?
Hi @mgierada thanks for reporting back. I haven't have time to give it another try. But I feel Serve unable to use protos from a different library might not be the case. For example the ones generated in serve (from ray.serve.generated.serve_pb2 import HealthzResponse, ListApplicationsResponse
) was able to be used. Would need more time to research on why that is. However, I think it's probably something to do with the version of grpc and the compile tool.
Also, if you are running this in distributed environment and/ or using Kuberay, I would also suggest you to make sure all the nodes has access to those protos, else serdes for those proto objects might also cause similar issues.
Also just a reference, we have test deployments that's also importing from non-plain file structure. Maybe this can give you some inspiration https://github.com/ray-project/ray/blob/70a8152766cb4e3e2273dd85cae9cf0cc6bd2537/python/ray/serve/tests/test_config_files/grpc_deployment.py#L5
Just to make it clear. I am trying to run this locally
Also just a reference, we have test deployments that's also importing from non-plain file structure.
Short answer is your tests don't work for testing subdir import because they are already imported into ray library
So you need to stage them to some non-ray output dir in order to properly test and fix this issue
What happened + What you expected to happen
I went through the documentation with 1 to 1 proto file but I keep getting the following errors when running the app.
Similar issue was posted here by someone else https://discuss.ray.io/t/keep-getting-error-typeerror-cannot-pickle-classmethod-descriptor-object/10153
Versions / Dependencies
but I tired so many version and combination including the latest grpcio-tools, grpcio and protobuf as well as the very old one. The issue is still the same.
I am running python 3.11.9 and apple silicon.
Reproduction script
Please follow the official documentation https://docs.ray.io/en/latest/serve/advanced-guides/grpc-guide.html
Issue Severity
High: It blocks me from completing my task.