pocokhc / simple_distributed_rl

Simple Distributed Reinforcement Learning Framework(シンプルな分散強化学習フレームワーク)
MIT License
40 stars 6 forks source link

leaky queue? #21

Closed skatayama closed 7 months ago

skatayama commented 8 months ago

Describe the bug

The following python code, modified from example/sample_atari.py to use r2d2 with sequence_length=80, gradually consumes lots of memory until the whole process is killed. Although the queue_capacity is set to 1, memory profiling result from memray suggests the `get' function of multiprocessing.queues consumes lots of gigabytes. Using train instead of train_mp, or using less burnin and sequence_length does not result in the symptom. Using agent57 instead of r2d2 does not solve the problem.

下記のコード(examples/sample_atari.pyをr2d2用に変更したもの)を実行すると、最初は6GBくらいなのですが、少しずつメモリ使用量が増大して行き、最終的にKilledになります。queue_capacityは1にしてありますが、(memrayでプロファイルした結果)multiprocessing.queuesのget関数がメモリを消費しているようです。train_mpの代わりにtrainを使ったり、burnin, sequence_lengthをそれぞれ減らすとこのような問題は起こりません。r2d2でもagent57でも同様の挙動があります.

import numpy as np

import srl
#from srl.algorithms import agent57  # algorithm load
from srl.algorithms import r2d2  # algorithm load
from srl.base.define import EnvObservationTypes
from srl.rl.processors.image_processor import ImageProcessor
from srl.utils import common

common.set_logger()

_parameter_path = "_sample_atari_r2d2_ac16_b40i80_q1_parameter.dat"
_history_path = "_sample_atari_r2d2_ac16_b40i80q1_history"

TRAIN_COUNT = 500_000

def _create_runner():
    # --- Atari env
    # Run "pip install gymnasium pygame" and also see the URL below.
    # https://gymnasium.farama.org/environments/atari/
    env_config = srl.EnvConfig(
        "ALE/Pong-v5",
        kwargs=dict(frameskip=7, repeat_action_probability=0, full_action_space=False),
    )
#    rl_config = agent57.Config()
    rl_config = r2d2.Config()
#    rl_config.set_atari_config()
    rl_config.actor_num = 16
    rl_config.enable_intrinsic_reward = False
    rl_config.burnin = 40
    rl_config.sequence_length = 80

    # カスタムしたprocessorを追加
    rl_config.processors = [
        ImageProcessor(
            image_type=EnvObservationTypes.GRAY_2ch,
            trimming=(30, 0, 210, 160),
            resize=(84, 84),
            enable_norm=True,
        )
    ]
    rl_config.use_rl_processor = False  # アルゴリズムデフォルトのprocessorを無効にする

    runner = srl.Runner(env_config, rl_config)
    return runner

def train():
    runner = _create_runner()

    # (option) print tensorflow model
    runner.model_summary(expand_nested=True)

    # --- train
    runner.set_history_on_file(_history_path, interval=5)
    runner.train_mp(max_train_count=TRAIN_COUNT, queue_capacity = 1)
    runner.save_parameter(_parameter_path)

if __name__ == "__main__":
    train()
#    plot_history()
#    evaluate()

To Reproduce

  1. python sample_atari_r2d2_act16_b40i80_q1.py # the name of the above code.
  2. Watch the memory behavior until the program will be killed by OS.

or memory profiling using memray is more informative:

  1. memray run --live-remote sample_atari_r2d2_act16_b40i80_q1.py
  2. memray live the_port_ID_here
  3. Watch the memory behavior. Press `<' some times to find the thread reporting the get function.

Expected behavior

The gradual and perpetual increase in the memory consumption should not happen.

Versions

大変有用なフレームワークをMITライセンスで公開いただき,ありがとうございます.

pocokhc commented 8 months ago

(If English is required, please use automatic translation)

問題報告ありがとうございます。 確認中ですが、まだ時間がかかりそうなので取り急ぎの返信となります。

一応既知の問題として、Memory(RLMemory)はOS側のサイズを確認していないのでバッチを追加し続けるとメモリが枯渇する問題があり、これと切り分けている最中となります。 R2D2/Agent57は履歴も含めてバッチを作成するのでそこそこのサイズになります。

import pickle

import srl
from srl.algorithms import r2d2
from srl.base.define import EnvObservationTypes
from srl.rl.processors.image_processor import ImageProcessor

env_config = srl.EnvConfig(
    "ALE/Pong-v5",
    kwargs=dict(frameskip=7, repeat_action_probability=0, full_action_space=False),
)
rl_config = r2d2.Config()
rl_config.burnin = 40
rl_config.sequence_length = 80
rl_config.processors = [
    ImageProcessor(
        image_type=EnvObservationTypes.GRAY_2ch,
        trimming=(30, 0, 210, 160),
        resize=(84, 84),
        enable_norm=True,
    )
]
rl_config.use_rl_processor = False
runner = srl.Runner(env_config, rl_config)

# --- 1episode
runner.rollout(max_episodes=1)

# --- look batch size(ave. 3M)
for _ in range(100):
    batch = runner.memory.sample(1, 0)
    print(f"size: {len(pickle.dumps(batch))}")

pickle化後でも3Mbyte程のサイズになるので、16GBのメモリで5000個あたりが限界になる計算です。 もしよければ以下で事象が再現するかご確認いただけますでしょうか。

import srl
from srl.algorithms import r2d2
from srl.base.define import EnvObservationTypes
from srl.rl.processors.image_processor import ImageProcessor
from srl.utils import common

common.set_logger()

def _create_runner():
    env_config = srl.EnvConfig(
        "ALE/Pong-v5",
        kwargs=dict(frameskip=7, repeat_action_probability=0, full_action_space=False),
    )
    rl_config = r2d2.Config()
    rl_config.burnin = 40
    rl_config.sequence_length = 80
    rl_config.memory.warmup_size = 100

    rl_config.processors = [
        ImageProcessor(
            image_type=EnvObservationTypes.GRAY_2ch,
            trimming=(30, 0, 210, 160),
            resize=(84, 84),
            enable_norm=True,
        )
    ]
    rl_config.use_rl_processor = False

    runner = srl.Runner(env_config, rl_config)
    return runner

def train():
    runner = _create_runner()

    runner.set_progress_options(interval_limit=5)
    runner.enable_stats()  # pip install psutil

    if True:  # change
        runner.rl_config.memory.capacity = 100
    runner.train_mp(timeout=60 * 10, queue_capacity=100)

if __name__ == "__main__":
    train()

memory.capacityでRLMemoryの上限を設定できます。 この値を設定する場合としない場合でメモリが枯渇するかどうかを確認したいと考えています。 また、"runner.enable_stats()"でprintの中にCPU率とメモリ率が表示されるようになります。(pip install psutil が必要です)

>or memory profiling using memray is more informative: memrayは初めて知りました。割と最近できたメモリプロファイルなんですね、勉強になります。

>大変有用なフレームワークをMITライセンスで公開いただき,ありがとうございます. ありがとうございます!そう言っていただけるとモチベーションに繋がります。

skatayama commented 8 months ago

なるほど.memory.capacityを設定する必要があったのですね. 確かに,ご提案のソースではメモリ使用量は増大しないようです. 私の添付した元のソースでも,memory.capacityを試してみます.

skatayama commented 7 months ago

しばらく反応できなくてすみません. 正確には,ご提案のソースでも,元のソースでmemory.capacity=10000とした場合でも,メモリ使用量は収まるのですが,終了時に 'RuntimeError: An exception has occurred in actor 0 process.(exitcode: -15)' となるようです.SIGTERMということで,core_mp.pyの最後のw.terminate()の結果だと思いますが,もう少し調べてみようと思います.

pocokhc commented 7 months ago

すいません 2週間ほどこちらに対応出来そうにない状況で…、しばらくお待ちください。

ただmp系のバグは開発ブランチで一つ改修しています。

https://github.com/pocokhc/simple_distributed_rl/commit/9fe50ce028c2f1a85432a5d236387a1cad3a2b3a

こちらのコードが入ったブランチ「developer/v0.14.2」のバージョンv0.14.1.1(現在最新)でご確認いただければと思います。

skatayama commented 7 months ago

ありがとうございます.試してみます.

skatayama commented 7 months ago

開発ブランチの改修でうまくいきました.結局memory.capacityを使用しなかったのがよくなかったみたいですね… memory.capacityを使用しない場合の挙動がメモリリークっぽいのは気になりますが,とりあえず私の方としては解決したようですので,閉じても構いません.いろいろお教えいただきありがとうございました.

pocokhc commented 7 months ago

検証した内容を備忘録として載せておきます。

環境は以下です。 ・v0.14.1 ・"dockers/docker-compose-latest-tf-gpu.yml" を以下に変更してコンテナ起動

version: "3.9"

services:
  srl-latest-tf:
    build:
      dockerfile: ./dockerfile_latest_tf_gpu
    volumes:
      - ../:/code/
    command: /bin/bash
    tty: true
    #deploy:
    #  resources:
    #    reservations:
    #      devices:
    #        - driver: nvidia
    #          count: 1
    #          capabilities: [gpu]
    mem_limit: 8g

検証コード

import srl
from srl.algorithms import r2d2
from srl.base.define import EnvObservationTypes
from srl.rl.processors.image_processor import ImageProcessor
from srl.utils import common

common.set_logger()

def _create_runner():
    env_config = srl.EnvConfig(
        "ALE/Pong-v5",
        kwargs=dict(frameskip=7, repeat_action_probability=0, full_action_space=False),
    )
    rl_config = r2d2.Config()
    rl_config.burnin = 40
    rl_config.sequence_length = 80
    rl_config.processors = [
        ImageProcessor(
            image_type=EnvObservationTypes.GRAY_2ch,
            trimming=(30, 0, 210, 160),
            resize=(84, 84),
            enable_norm=True,
        )
    ]
    rl_config.use_rl_processor = False

    runner = srl.Runner(env_config, rl_config)
    return runner

def train():
    runner = _create_runner()
    runner.set_progress_options(interval_limit=5)
    runner.enable_stats()  # pip install psutil

    # --- case1 baseline
    # runner.train_mp(timeout=60 * 10)

    # --- case2
    # runner.train_mp(timeout=60 * 30, queue_capacity=1)

    # --- case3
    # runner.train(timeout=60 * 60 * 1)

    # --- case4
    runner.rl_config.memory.warmup_size = 100
    runner.rl_config.memory.capacity = 100
    runner.train_mp(timeout=60 * 30, queue_capacity=100)

if __name__ == "__main__":
    train()

・case1

08:14:06 trainer:   2.0m(  8.0m left),  0.48tr/s,     25tr,  1759mem, Q   13recv/s(    1731),  100 send Param[CPU 64%,M59%]|loss 0.020|sync  1
08:14:08 actor 0:   2.0m(  8.0m left),    14st/s,   2385st,   998mem, Q   15send/s(    2762),   86 recv Param,   5ep, 145st [-6]reward[CPU  3%,M60%]
root@89e897133861:/code# queue capacity over: 1000

→約2分後、trainer 1759mem、actor0  998mem でtrainerが落ちる

・case2

08:29:24 trainer:  13.9m( 16.1m left),  0.49tr/s,    121tr,  1311mem, Q    1recv/s(    1308),  735 send Param[CPU 70%,M61%]|loss 0.015|sync  1
08:29:28 actor 0:  13.9m( 16.1m left),  1.36st/s,   1161st,     1mem, Q    1send/s(    1316),  583 recv Param,   2ep, 236st [-10]reward[CPU  1%,M61%]
root@89e897133861:/code# queue capacity over: 1

→約14分後、trainer 1311mem、actor0  1mem でtrainerが落ちる

・case3

09:36:47  60.0m( 2.14s left),  0.57st/s,   3002st,  3475mem,   6ep,  2081tr, 211st [-8]reward[CPU 76%,M50%]|loss 0.004|sync  3
2024-03-09 09:36:50,963 srl.base.run.core_play _play 238 [INFO] [RunNameTypes.main] loop end(timeout.)
09:36:50  60.0m(-1.52s left),  0.55st/s,   3004st,  3477mem,   6ep,  2083tr, 213st [-8]reward[CPU 75%,M50%]|loss 0.003|sync  3

→正常終了。3477mem

・case4

10:07:21 actor 0:  29.9m( 6.33s left),    25st/s,  35423st,    75mem, Q   22send/s(   41216), 1030 recv Param,  74ep, 185st [-6]reward[CPU  5%,M88%]
10:07:24 trainer:  30.0m( 0.11s left),  0.52tr/s,    934tr,   100mem, Q   27recv/s(   41200), 1418 send Param[CPU 76%,M86%]|loss 0.489|sync  1
2024-03-09 10:07:25,984 srl.base.run.core_train_only _play_trainer_only 92 [INFO] loop end(timeout.)

→正常終了。trainer 100mem、actor75mem

結果から確かにmultiprocessingで余分なメモリ確保が行われているように見えます。 本issueは一旦クローズしますが、調査は続けていきたいと思います。

また、以下をTODO予定。 ・メモリ内データの圧縮 ・(残メモリが閾値になったら警告表示?、実装方法が見えていないのでもしかしたら諦めるかも)