ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.94k stars 5.58k forks source link

[Core] Default concurrency using concurrency groups #46666

Open Famok opened 1 month ago

Famok commented 1 month ago

What happened + What you expected to happen

I want to use concurrency_groups to limit the concurrency of certain functions of an actor that need to be executed in serial. According to the docs we can do that using concurrency_groups. That works fine, even defining different groups for different functions. I thought after reading the doc, that you can define the default behaviour of a method using the max_concurrency kwarg. That also works fine as long as we don't use groups as well.

The problem: Using max_concurrency and concurrency_groups at the same time doesn't seem to work as I thought it would, since ray seems to choose whatever is bigger for decorated functions.

Current workaround: Define a default concurrency_group (e.g. default) and set all other functions to that group.

Versions / Dependencies

ray 2.31.0 python 2.9.19 OS Windows 10

Reproduction script

import ray
import time
import numpy as np

@ray.remote(concurrency_groups={"group1": 3,"group2":1},max_concurrency=3)
class Test:
    @ray.method(concurrency_group='group1')
    def f1(self, ctr):
        time.sleep(np.random.randint(0,3))
        print(time.time(), ctr)

    @ray.method(concurrency_group='group2')
    def f2(self, ctr):
        time.sleep(np.random.randint(0,3))
        print(time.time(), ctr)

t = Test.remote()

print('1nd Test (chaos)')
refs = []
for i in range(10):
    refs.append(t.f1.remote(i))

ray.get(refs) # wait for prints to finish
time.sleep(2) # wait some more

print('2nd Test (still chaos)')
for i in range(10):
    t.f2.options().remote(i)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Superskyyy commented 1 month ago

Hi @Famok You are saying that group2 still uses 3 (actually it should be 1) right?

Famok commented 1 month ago

@Superskyyy yes that is what my script demonstrates. f1 should be "chaos" since the concurrency_group sets it to 3 f2 should not be "chaos" since the concurrency_group sets it to 1 But due to max_concurrency = 3 f2 is chaos which shouldn't be the case.

Maybe my example is too confusing. I have some function "func" which i want to run in serial (concurrency = 1) and many other methods which shall run concurrent (concurrency = 3). I thought I can achieve that like so:

import ray
import time
import numpy as np

@ray.remote(concurrency_groups={"group":1},max_concurrency=3)
class Test:
    @ray.method(concurrency_group='group')
    def func(self, ctr):
        time.sleep(np.random.randint(0,3))
        print(time.time(), ctr)

    def othermethod(self, ctr):
        time.sleep(np.random.randint(0, 3))
        print(time.time(), ctr)

t = Test.remote()

print('These should print in order (0...9), but dont')
for i in range(10):
    t.func.remote(i)

print('othermethod should run concurrent and it does')
for i in range(10):
    t.othermethod.remote(i)
Superskyyy commented 1 month ago

I see, I understand your case, thanks! @Famok I will try to take a look at the code, and fix it accordingly. (Will update here)

jjyao commented 1 month ago

Based on https://docs.ray.io/en/latest/ray-core/actors/concurrency_group_api.html, max_concurrency sets the concurrency for the default concurrency group, it shouldn't affect other non-default concurrency groups.

jjyao commented 1 month ago

@Superskyyy let me know if you need any help for the fix.

Famok commented 1 month ago

Based on https://docs.ray.io/en/latest/ray-core/actors/concurrency_group_api.html, max_concurrency sets the concurrency for the default concurrency group, it shouldn't affect other non-default concurrency groups.

@jjyao that was my understanding, but if you look at my example it doesn't.

Superskyyy commented 1 month ago

@Superskyyy let me know if you need any help for the fix.

Thanks, I will submit a PR to fix this soon. Was busy last few weeks.