mosaicml / streaming

A Data Streaming Library for Efficient Neural Network Training
https://streaming.docs.mosaicml.com
Apache License 2.0
1.12k stars 141 forks source link

MemoryError: Unable to allocate #771

Open AugustDev opened 2 months ago

AugustDev commented 2 months ago

Loading large dataset gives an error: "MemoryError: Unable to allocate 202. GiB for an array with shape (27084389376,) and data type int64"

self.num_shards
237460

self.num_samples
27084388471

Environment

To reproduce

I have very large dataset I converted following "Spark to MDS" tutorial on the MosaicML website. I have the dataset in a disk mounted to my machine. I am able to load eval (much smaller dataset), however when loading train dataset it gives an error.

When loading the dataset I get an error

from streaming import StreamingDataset
dataset = StreamingDataset(
    local="/mnt/disks/dataset/train",
    remote=None,
    batch_size=64
)
Cell In[7], [line 1](vscode-notebook-cell:?execution_count=7&line=1)
----> [1](vscode-notebook-cell:?execution_count=7&line=1) dataset = StreamingDataset(
      [2](vscode-notebook-cell:?execution_count=7&line=2)     local="/mnt/disks/dataset/train"",
      [3](vscode-notebook-cell:?execution_count=7&line=3)     remote=None,
      [4](vscode-notebook-cell:?execution_count=7&line=4)     batch_size=64
      [5](vscode-notebook-cell:?execution_count=7&line=5) )

File /opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:514, in StreamingDataset.__init__(self, streams, remote, local, split, download_retry, download_timeout, validate_hash, keep_zip, epoch_size, predownload, cache_limit, sampling_method, sampling_granularity, partition_algo, num_canonical_nodes, batch_size, shuffle, shuffle_algo, shuffle_seed, shuffle_block_size, batching_method, allow_unsafe_types, replication)
    [512](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:512) self.samples_per_shard = np.array([shard.samples for shard in self.shards], np.int64)
    [513](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:513) self.sample_offset_per_shard = self.samples_per_shard.cumsum() - self.samples_per_shard
--> [514](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:514) self.spanner = Spanner(self.samples_per_shard)
    [516](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:516) # Now that we know the number of underlying samples of each stream, derive each stream's
    [517](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:517) # true proportion/repeat/choose, as well as the total epoch size.
    [518](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:518) self.epoch_size = Stream.apply_weights(self.streams, self.samples_per_stream,
    [519](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/dataset.py:519)                                        epoch_size_value, self.shuffle_seed)

File /opt/conda/lib/python3.10/site-packages/streaming/base/spanner.py:28, in Spanner.__init__(self, shard_sizes, span_size)
     [25](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/spanner.py:25) underflow = span_size - overflow if overflow else 0
     [26](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/spanner.py:26) self.shard_sizes[-1] += underflow
---> [28](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/spanner.py:28) sample_shards = np.repeat(np.arange(len(shard_sizes)), self.shard_sizes)
     [29](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/spanner.py:29) sample_shards = sample_shards.reshape(-1, span_size)
     [30](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/streaming/base/spanner.py:30) span_lowest_shards = sample_shards.min(1)

File /opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:466, in repeat(a, repeats, axis)
    [423](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:423) @array_function_dispatch(_repeat_dispatcher)
    [424](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:424) def repeat(a, repeats, axis=None):
    [425](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:425)     """
    [426](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:426)     Repeat each element of an array after themselves
    [427](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:427) 
   (...)
    [464](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:464) 
    [465](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:465)     """
--> [466](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:466)     return _wrapfunc(a, 'repeat', repeats, axis=axis)

File /opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:59, in _wrapfunc(obj, method, *args, **kwds)
     [56](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:56)     return _wrapit(obj, method, *args, **kwds)
     [58](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:58) try:
---> [59](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:59)     return bound(*args, **kwds)
     [60](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:60) except TypeError:
     [61](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:61)     # A TypeError occurs if the object does have such a method in its
     [62](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:62)     # class, but its signature is not identical to that of NumPy's. This
   (...)
     [66](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:66)     # Call _wrapit from within the except clause to ensure a potential
     [67](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:67)     # exception has a traceback chain.
     [68](https://vscode-remote+ssh-002dremote-002b34-002e123-002e178-002e5.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.10/site-packages/numpy/core/fromnumeric.py:68)     return _wrapit(obj, method, *args, **kwds)

MemoryError: Unable to allocate 202. GiB for an array with shape (27084389376,) and data type int64

Expected behavior

Dataset loads.

Additional context

My index.json is 243 MB for train dataset.

I have used the following Spark settings to convert to MDS

        SparkSession.builder.appName("OptimizedMosaicCreation")
        .config("spark.driver.memory", "60g")
        .config("spark.executor.memory", "60g")
        .config("spark.executor.cores", "4")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.shuffle.service.enabled", "true")
        .config("spark.default.parallelism", "200")
        .config("spark.sql.shuffle.partitions", "200")
        .config("spark.memory.fraction", "0.8")
        .config("spark.memory.storageFraction", "0.3")
        .config("spark.executor.memoryOverhead", "20g")
        .config("spark.local.dir", spark_local_dir)
        .config("spark.worker.dir", spark_local_dir)
        .config("spark.shuffle.spill.externalSorter.dir", spark_local_dir)
        .config("spark.network.timeout", "800s")
        .config("spark.executor.heartbeatInterval", "60s")
        .config("spark.storage.blockManagerSlaveTimeoutMs", "300000")
        .config("spark.shuffle.io.connectionTimeout", "180s")
        .config("spark.shuffle.io.retryWait", "60s")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.shuffle.file.buffer", "1m")
        .config("spark.file.transferTo", "false")
        .config("spark.shuffle.unsafe.file.output.buffer", "5m")
        .config("spark.cleaner.referenceTracking.cleaning.enabled", "false")
        .config("spark.storage.cleanupFilesAfterExecutorExit", "false")
        .config("spark.worker.cleanup.enabled", "false")
        .getOrCreate()
XiaohanZhangCMU commented 2 months ago

@AugustDev what's your compute machine configuration? The error msg says the reason: streaming at one point created a spanner of 27084389376 integers, which is 8 bytes for each, so ~200GB. If you have less CPU memory than that, OOM is pretty much expected.