google-deepmind / reverb

Reverb is an efficient and easy-to-use data storage and transport system designed for machine learning research
Apache License 2.0
704 stars 92 forks source link

Client calls in separate process hanging on WSL2 #35

Closed devinsaini closed 3 years ago

devinsaini commented 3 years ago

I'm trying to run reverb client and server in separate processes using multiprocessing. I've tried both dm-reverb 0.2.0 and dm-reverb-nightly 0.3.0.dev20210402 on Python 3.8, but getting the same result with the following piece of code. One key info is that I'm running this on Windows machine with WSL2 on Ubuntu 20.04.

The main process client works fine, but side process client hangs at the call to client.server_info(). I've also tried client.sample() after writing some data from main process and trying to access in the side process with the same result.

import reverb
import multiprocessing

def main():
    server = reverb.Server(tables=[
        reverb.Table(
            name='cartpole',
            sampler=reverb.selectors.Uniform(),
            remover=reverb.selectors.Fifo(),
            max_size=100,
            rate_limiter=reverb.rate_limiters.MinSize(1))
        ],
        port=8000
    )

    client = reverb.Client('localhost:8000')
    print(f"Main process : {client.server_info()}")

    side_proc = multiprocessing.Process(target=side_process)
    side_proc.start()

    server.wait()

def side_process():
    print("Side process started")
    client = reverb.Client('localhost:8000')
    print(f"Side process : {client.server_info()}")
    print("Side process terminated")

if __name__ == '__main__':
    main()
$ /bin/python3 /home/devin/test/reverbtest.py
2021-04-03 19:21:45.471910: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
[reverb/cc/platform/tfrecord_checkpointer.cc:144] Initializing TFRecordCheckpointer in /tmp/tmpon4xcimy
[reverb/cc/platform/tfrecord_checkpointer.cc:338] Loading latest checkpoint from /tmp/tmpon4xcimy
[reverb/cc/platform/default/server.cc:55] Started replay server on port 8000
Main process : {'cartpole': TableInfo(name='cartpole', sampler_options=uniform: true
, remover_options=fifo: true
is_deterministic: true
, max_size=100, max_times_sampled=0, rate_limiter_info=samples_per_insert: 1.0
min_diff: -1.7976931348623157e+308
max_diff: 1.7976931348623157e+308
min_size_to_sample: 1
insert_stats {
  completed_wait_time {
  }
  pending_wait_time {
  }
}
sample_stats {
  completed_wait_time {
  }
  pending_wait_time {
  }
}
, signature=None, current_size=0, num_episodes=0, num_deleted_episodes=0)}
Side process started
^Z
fastturtle commented 3 years ago

Hi @devinsaini does this also hang if you run side_process() from a separate script (i.e. without using multiprocessing)?

devinsaini commented 3 years ago

@fastturtle Yes it hangs even if I run it from a separate script. However, I was able to get the following script to work, where I start both server and client in separate processes.

import reverb
import multiprocessing

def main():
    server_proc = multiprocessing.Process(target=server_process)
    server_proc.start()
    client_proc = multiprocessing.Process(target=client_process)
    client_proc.start()
    client_proc.join()

def server_process():
    server = reverb.Server(tables=[
        reverb.Table(
            name='cartpole',
            sampler=reverb.selectors.Uniform(),
            remover=reverb.selectors.Fifo(),
            max_size=100,
            rate_limiter=reverb.rate_limiters.MinSize(1))
        ],
        port=8000
    )
    server.wait()

def client_process():
    client = reverb.Client('localhost:8000')
    print(f"Client process : {client.server_info()}")
    print("Client process terminated")

if __name__ == '__main__':
    main()

One more thing, since I haven't implemented a clean way to call server.stop(), I manually kill the process using port 8000 using kill -9 $(sudo lsof -t -i:8000) before running it again. Here's an output of lsof after running the program and quitting ^Z out of it:

~$ sudo lsof -i:8000
COMMAND   PID  USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
python3 17058 devin    5u  IPv6 467873      0t0  TCP *:8000 (LISTEN)
python3 17096 devin    5u  IPv6 467873      0t0  TCP *:8000 (LISTEN)

However, the first script I posted doesn't work even when there's nothing using port 8000.

zhangxiaochuan commented 3 years ago

@devinsaini Set the subprocess start method to 'spawn' will fix this issue.

import reverb
import multiprocessing

def side_process():
    print("Side process started")
    client = reverb.Client('localhost:8000')
    print(f"Side process : {client.server_info()}")
    print("Side process terminated")

def main():
    server = reverb.Server(tables=[
        reverb.Table(
            name='cartpole',
            sampler=reverb.selectors.Uniform(),
            remover=reverb.selectors.Fifo(),
            max_size=100,
            rate_limiter=reverb.rate_limiters.MinSize(1))
        ],
        port=8000
    )

    client = reverb.Client('localhost:8000')
    print(f"Main process : {client.server_info()}")

    side_proc = multiprocessing.Process(target=side_process)
    side_proc.start()

    server.wait()

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    main()
fastturtle commented 3 years ago

@zhangxiaochuan thanks for your answer! I'm closing this issue as using the spawn strategy (instead of fork, which is default on Linux OSes) fixes this issue.