pfnet / pfio

IO library to access various filesystems with unified API
https://pfio.readthedocs.io/
MIT License
52 stars 21 forks source link

Introduce reset_on_fork argument #266

Closed kuenishi closed 2 years ago

kuenishi commented 2 years ago

To address https://github.com/pfnet/pfio/issues/260 , a new argument reset_on_fork is introduced to each FS object constructor. If it is turned on, internal method _check_fork() calls a newly introduced abstract method _reset(self) . This abstraction enables transparent forking in any situation - be it PyTorch DataLoader or multiprocessing, or "forkserver" enabled.

Classes S3 and Hdfs now do not pickle their internal unpickable, e.g. boto connection or Hdfs connection. If they're forked, they're forced to reset the connection.

It also worked well with PyTorch DataLoader like this:

import time
import os
import multiprocessing

import torch
from pfio.v2 import from_url

class Dataset:
    def __init__(self):
        self.s3 = from_url('s3://bucket/', reset_on_fork=True)

    def __len__(self):
        return 1600

    def __getitem__(self, i):
        with self.s3.open('sl.c', 'rb') as fp:
            return fp.read()

multiprocessing.set_start_method('forkserver', force=True)
#p = multiprocessing.Process()
#p.start()
#p.join()

def main():
    ds = Dataset()
    loader = torch.utils.data.DataLoader(ds, batch_size=8, num_workers=2)
    it = iter(loader)
    data = next(it)
    print(len(data))
    data = next(it)
    print(len(data))
    data = next(it)
    print(type(data))

if __name__ == '__main__':
    main()
$ python dataloader_test.py
8
8
<class 'list'>

Note: this change deprecates pfio.v2.lazify() .