Open germanjke opened 9 months ago
I'm ok with
git branch
* (HEAD detached at 6c41241)
composer==0.15.0
mosaicml-streaming==0.6.1
And not ok with
* (HEAD detached at 4638092)
mosaicml-streaming==0.7.1
Dont know where problem is?
I cant launch with num_workers: 0
, cause prefetch_factor
With num_workers: 1
same error as in title
Hey @germanjke, sorry for the late response. Can you please share the below values for us to debug this issue?
num_samples (int): Dataset size.
num_canonical_nodes (int): Number of canonical nodes.
num_physical_nodes (int): Number of physical nodes.
ranks_per_node (int): Number of ranks per node.
workers_per_rank (int): Number of worker partitions per rank.
batch_size (int, optional): The batch size of its DataLoader, which affects how the dataset is partitioned over the workers.
drop_first (int): Number of samples seen already, which are dropped. This will be zero if you are not resuming from the checkpoint.
initial_physical_nodes (int, optional): Number of physical nodes at the start of training. Defaults to ``None``.
Hey @karan6181 do you know how I can take this values from index.json
or maybe from llm-foundry
wandb log?
train_loader
dataset
max_seq_len
2048
shuffle
true
shuffle_seed
17
streams (45 collapsed)
drop_last
false
name
"text"
num_workers
8
composer_commit_hash
"None"
composer_version
"0.17.1"
console_log_interval
"100ba"
device_eval_batch_size
16
device_train_batch_size
32
device_train_grad_accum
2
device_train_microbatch_size
16
Hey @germanjke, let me help you get that information. Also, what streaming dataset version are you using?
num_samples (int): Open index.json
and add the values from all entries of samples
.
num_canonical_nodes (int): From the above issue description, you are not passing any values. Hence, it would be default.
num_physical_nodes (int): How many nodes are you running this on?
ranks_per_node (int): Number of GPUs per node?
workers_per_rank (int): This is 8 based on the above value. num_workers: 8
.
batch_size (int, optional): What batch_size
value are passing it to StreamingDataset? is it 16 or 32?
drop_first (int): Number of samples seen already, which are dropped. This could be zero if you are not resuming from the checkpoint.
initial_physical_nodes (int, optional): Number of physical nodes at the start of training. This could be None if you are not resuming from the checkpoint.
An easier method to find out those values is to clone the streaming dataset, create a test branch, add a print statement at this line to print all the params, push a commit, install that custom branch, and run your job.
Hi @karan6181 Thank you for advice, I'm was doing to reproduce this, just copied your file and paste print there. And everything works fine now!
[batch=1/10000]:
Train time/epoch: 0
Train time/batch: 0
Train time/sample: 0
Train time/batch_in_epoch: 0
Train time/sample_in_epoch: 0
Train time/token: 0
Train time/token_in_epoch: 0
Train memory/current_allocated_mem: 11.1450
Train memory/current_active_mem: 11.1450
Train memory/current_inactive_mem: 3.7532
Train memory/current_reserved_mem: 63.3420
Train memory/peak_allocated_mem: 45.1670
Train memory/peak_active_mem: 45.6680
Train memory/peak_inactive_mem: 14.9810
Train memory/peak_reserved_mem: 63.3420
Train memory/alloc_retries: 0
Train trainer/device_train_microbatch_size: 16
Train loss/train/total: 7.5101
Train metrics/train/LanguageCrossEntropy: 7.5101
Train metrics/train/LanguagePerplexity: 1826.4814
Train lr-DecoupledAdamW/group0: 0.0000
Train time/train: 0.0211
Train time/val: 0.0000
Train time/total: 0.0211
So the only change I have changed my orig.py file by yours, my llm foundry version is
* (HEAD detached at 4638092)
So I guess some changes was just recently just in this file?
Streaming version is the same mosaicml-streaming==0.7.1
Also here is my prints, I have 3 validations dataloaders and 1 train:
num_samples: 21,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 16,
drop_first: 0,
initial_physical_nodes: None
/usr/lib/python3/dist-packages/streaming/base/partition/orig.py:163: UserWarning: Attempting to partition 3 samples per physical node over 8 gpus. This will result in many samples being repeated, and depending on your batching method, batches being completely dropped. Check if your dataset has the expected number of samples.
warnings.warn(f'Attempting to partition {ids.shape[1]} samples per physical node ' +
num_samples: 402,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 16,
drop_first: 0,
initial_physical_nodes: None
num_samples: 1088115,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 16,
drop_first: 0,
initial_physical_nodes: None
num_samples: 121149865,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 32,
drop_first: 0,
initial_physical_nodes: None
Can we deliver this version of file to llm-foundry
to build it from setup.py
from llm-foundry
repo?
I'm using llm-foundry submodule
old version file, here I have bugs
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0
"""Apportion shards/samples to nodes/ranks/workers for elastically deterministic sample order."""
import logging
import math
from typing import Optional
import numpy as np
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
def get_partitions_orig(num_samples: int,
num_canonical_nodes: int,
num_physical_nodes: int,
ranks_per_node: int,
workers_per_rank: int,
batch_size: Optional[int] = None,
drop_first: int = 0,
initial_physical_nodes: Optional[int] = None) -> NDArray[np.int64]:
"""Partition the given number of samples to nodes, ranks, and workers.
Either canonical or physical nodes must be evenly divisible by the other.
It is suggested to set num_canonical_nodes higher than your expected number of physical nodes,
because scaling your number of nodes below that level may result in more shards being used
across node boundaries due to preserving the same global sample order.
Args:
num_samples (int): Dataset size.
num_canonical_nodes (int): Number of canonical nodes.
num_physical_nodes (int): Number of physical nodes.
ranks_per_node (int): Number of ranks per node.
workers_per_rank (int): Number of worker partitions per rank.
batch_size (int, optional): Batch size of its DataLoader, which affects how the dataset is
partitioned over the workers. Defaults to ``None``.
drop_first (int): Number of samples seen already, which are dropped. Defaults to ``0``.
initial_physical_nodes (int, optional): Number of physical nodes at the start of training.
Defaults to ``None``.
Returns:
NDArray[np.int64]: Partitions of shape (physical nodes, ranks per node, workers per rank,
batches per worker, batch size).
"""
if num_samples <= drop_first:
raise ValueError(f'Resuming further into the dataset ({drop_first}) than it has samples ' +
f'({num_samples})')
if num_canonical_nodes < num_physical_nodes:
if num_physical_nodes % num_canonical_nodes:
raise ValueError('Either canonical or physical nodes must be evenly divisible by ' +
'the other, otherwise striping slices of shards over nodes may ' +
'lead to each node downloading all shards')
elif num_physical_nodes < num_canonical_nodes:
if num_canonical_nodes % num_physical_nodes:
raise ValueError('Either canonical or physical nodes must be evenly divisible by ' +
'the other, otherwise striping slices of shards over nodes may ' +
'lead to each node downloading all shards')
batch_size = batch_size or 1
# If drop_first isn't a multiple of num_physical_nodes, round down to make it divisible.
if drop_first % num_physical_nodes:
logger.warning(
'`drop_first` was not divisible by `num_physical_nodes`. Rounding it down ' +
'to make it divisible.')
drop_first -= drop_first % num_physical_nodes
# Divide the full dataset sample range into a sample range per canonical node.
samples_per_canonical_node = (num_samples + num_canonical_nodes - 1) // num_canonical_nodes
node_ratio = 0
padding = 0
if num_canonical_nodes < num_physical_nodes:
node_ratio = num_physical_nodes // num_canonical_nodes
overflow = samples_per_canonical_node % node_ratio
if overflow:
padding = node_ratio - overflow
padded_samples_per_canonical_node = samples_per_canonical_node + padding
# Create the initial sample ID matrix.
#
# ids: (canonical nodes, padded samples per canonical node).
ids = np.arange(num_canonical_nodes * padded_samples_per_canonical_node, dtype=np.int64)
ids = ids.reshape(num_canonical_nodes, padded_samples_per_canonical_node)
# Adjust row offsets to ignore the padding.
#
# row_offsets: (canonical nodes, 1).
row_offsets = np.arange(num_canonical_nodes) * padding
row_offsets = np.expand_dims(row_offsets, 1)
ids -= row_offsets
# Reconfigure where each row starts iterating for irregular-sized rows.
#
# row_starts: (canonical nodes, 1).
row_starts = np.arange(num_canonical_nodes) * num_samples // num_canonical_nodes
row_starts = np.expand_dims(row_starts, 1)
ids += row_starts - ids[:, :1]
# For short rows (length not evenly divisible), repeat the last ID to get even length.
#
# row_stops: (canonical nodes, 1).
row_stops = np.arange(1, 1 + num_canonical_nodes) * num_samples // num_canonical_nodes
row_stops = np.expand_dims(row_stops, 1)
are_rows_short = row_stops - row_starts < samples_per_canonical_node
ids[:, samples_per_canonical_node - 1:samples_per_canonical_node] -= are_rows_short
# If padding we needed, repeat samples to populate it.
if padding:
ids[:, -padding:] = ids[:, -padding - node_ratio + 1 - padding:-padding - node_ratio + 1]
# Flatten, drop samples that have already been seen, reshape back.
#
# ids: (physical nodes, samples per node).
ids = ids.transpose()
ids = ids.flatten()
ids = ids[drop_first:]
ids = ids.reshape(-1, num_physical_nodes)
ids = ids.transpose()
# Interleave the node sample ranges over each node's ranks, padding by repeating the last
# sample.
#
# ids: (physical nodes, samples per rank, ranks per node).
overflow = ids.shape[1] % ranks_per_node
if overflow:
underflow = ranks_per_node - overflow
last = ids[:, -ranks_per_node - underflow + 1:-ranks_per_node + 1]
ids = np.concatenate([ids, last], 1)
ids = ids.reshape(num_physical_nodes, -1, ranks_per_node)
# Pad with -1 adequately for reshaping across workers.
#
# ids: (physical nodes, samples per rank, ranks per node).
overflow = ids.shape[1] % workers_per_rank
rounded_num_samples = math.ceil(
ids.shape[1] / (workers_per_rank * batch_size)) * (workers_per_rank * batch_size)
overflow = rounded_num_samples - ids.shape[1]
if overflow:
last = np.full((num_physical_nodes, overflow, ranks_per_node), -1, np.int64)
ids = np.concatenate([ids, last], 1)
# Interleave each rank's padded samples across its workers.
#
# ids: (physical nodes, ranks per node, workers per rank, batches per worker, batch size).
ids = ids.transpose(0, 2, 1)
ids = ids.reshape(num_physical_nodes, ranks_per_node, -1, workers_per_rank, batch_size)
return ids.transpose(0, 1, 3, 2, 4)
not relevant comment (edited), resume training if fine with new file
@karan6181 What do you think, it's some conflicts in llm-foundry0.4.0
? Myabe need quick fix or something?
Hey @germanjke, sorry for my late response. Can you try the latest streaming dataset version? We have fixed some issues recently.
Hi @karan6181 thank you everything works fine on last streaming and llm foundry versions
Hi, looks like some new version of llm-foundry (updated from master) have lags in last week-two. I have error like this
Changing
drop_last
toFalse
doesn't helps If I will changenum_workers
to some value will I lose some efficiency? Always launched with 8.