google-deepmind / envlogger

A tool for recording RL trajectories.
Apache License 2.0
91 stars 13 forks source link

Multiprocessed reader #2

Closed mikygit closed 2 years ago

mikygit commented 2 years ago

Hello, I'm trying to boost the reading using multiprocessing but I get an error a AttributeError: Can't pickle local object 'BackendReader._get_nth_episode..get_nth_step_from_episode'. Probably the object is not pickable for some reason.

Any chance you managed to do it on your side? Or would have other hints to boost the reading?

Thanx.

kenjitoyama commented 2 years ago

Hello @mikygit,

We never tried to use multiprocessing with EnvLogger due to constraints in Google's infrastructure and with how we run hermetic Python binaries. I believe the pickling error you're getting is likely due to having the core of the functionality implemented as C++ extensions with pybind11, but I'm speculating here.

Internally, we get good performance by using multiple readers via multi-threading by sharing RiegeliShardReaders created with the Clone() method. This allows them to share the relatively expensive in-memory index while being thread-safe because each thread has its own file descriptor, so in general this is more resource-efficient than using multiprocessing (even if it worked out-of-the-box). The only issue I see is that we did that only for C++ and haven't added the Python bindings, but this is something we should do. I'll get back to this bug after I get more info.

Another alternative if you're doing some sort of pipeline is to use Beam which will spawn separate processes without pickling. We do this internally quite often, and it can unlock more performance if you've got multiple machines and/or processor cores, especially if the trajectories are gigantic.

mikygit commented 2 years ago

Great. Sorry yes mulithreading is also fine as long as it boost the reading. At the moment, It takes 150minutes to load 100000 trajectories ... I'll have a look at Beam also thanx!

mikygit commented 2 years ago

Hello, Any chance you have some beam snipets to help with SequenceAdapaters using Beam? I'm a bit lost here ...

mikygit commented 2 years ago

Ok, I managed a quick test and I still get the same error using beam (python): AttributeError: Can't pickle local object 'BackendReader._get_nth_episode..get_nth_step_from_episode'

Are you sure you managed to parallelize the reading in python?

kenjitoyama commented 2 years ago

Hi @mikygit,

Yes, we process trajectories in parallel using Beam with no issues.

How is the structure of your data? Do you have episodes? Do you care about the order of episodes when processing them?

Internally we process EnvLogger data in different ways. The most common is by processing individual episodes and the way we do that is by sending batches of episodes to each Beam worker. Each worker can create its own Reader instance and read only the episodes it needs without sharing anything. In some other use cases we care about applying a function to each step with not regards to episodes, and in those cases we just read the raw data using Riegeli and call the function individually.

Do you have a small test to reproduce the error you're getting? Something we can trigger in a minute or so.

Thanks

mikygit commented 2 years ago

Hello @kenjitoyama, Yes I have episodes and I don't care about the order.

I just tried using ray and I got the same error. See the code below but there's nothing fancy i'm afraid ...

@ray.remote
def ray_add(traj):
    return [[s.action, s.timestep.step_type == StepType.LAST, s.timestep.observation, s.timestep.reward] for s in traj]

def add(traj):
    return [[s.action, s.timestep.step_type == StepType.LAST, s.timestep.observation, s.timestep.reward] for s in traj]

with reader.Reader(data_directory=args.checkpoint_dir) as r:
            episodes = r.episodes
            trajs = []
            start = time.time()
            for i, episode in enumerate(tqdm(episodes[:100], desc="loading previous trajectories")):
                trajs.append(add(episode))
            end = time.time()
            logger.info(f"[{os.getpid()}]: {end-start} : {len(trajs)}")

            trajs = []
            start = time.time()
            for i, episode in enumerate(tqdm(episodes[:100], desc="ray loading previous trajectories")):
                trajs.append(ray_add.remote(episode))
            results = ray.get(trajs)
            end = time.time()
            logger.info(f"[{os.getpid()}]: {end-start} : {len(results)}")
kenjitoyama commented 2 years ago

Hi @mikygit,

How does ray send over the compute to another node? If it tries to pickle it, then it won't work. With Beam, the code is sent along with a Python interpreter so no serialization is needed, so I don't see how Beam should generate such pickling errors.

@sabelaraga, any thoughts?

mikygit commented 2 years ago

Correct. It uses pickle. As for Beam, I probably over simplified my test and did it all wrong:

import apache_beam as beam
from typing import Iterable
from envlogger import reader
from dm_env import StepType

def get_steps(traj):
    [[s.action, s.timestep.step_type == StepType.LAST, s.timestep.observation, s.timestep.reward] for s in traj]

with beam.Pipeline() as pipeline:
    with reader.Reader(data_directory="/tmp-network/fast/project/vdt/tests") as r:
        episodes = (
            pipeline 
            | beam.Create([s for s in r.episodes[:100]])
            | beam.Map(get_steps)
        )

    print(len(episodes))

ERROR: lambda x: dumps(x, protocol), pickle.loads) AttributeError: Can't pickle local object 'BackendReader._get_nth_episode..get_nth_step_from_episode'

Any ideas?

mikygit commented 2 years ago

Ok I managed to fix the problem by subclassing and modifying RiegeliBackendReader and SequenceAdapter. Reader goes not 5 times faster!