togethercomputer / together-python

The Official Python Client for Together's API
https://pypi.org/project/together/
Apache License 2.0
31 stars 7 forks source link

Check Parquet files in together-cli, supply filetype in header #73

Closed mryab closed 6 months ago

mryab commented 6 months ago

This PR allows together-cli to check the integrity of pretokenized Parquet files that users can supply if they do not want us to tokenize their data. Namely, it checks that

  1. The file can be read
  2. The file contains the necessary input_ids column
  3. The file does not contain any columns apart from PARQUET_EXPECTED_COLUMNS
  4. The number of samples is greater than or equal to together.min_samples

Also, it performs a rudimentary check for the filetype and passes that in a header to the Files API: once the API receives it, it can add the filetype as metadata to finetune jobs for a given dataset.

Companion PR in together-finetune (already merged): https://github.com/togethercomputer/together-finetune/pull/374

clam004 commented 6 months ago

@mryab @benathi could one of you provide a practical example of a user making a parquet file from text preferably and using this new feature to check and upload it?

orangetin commented 6 months ago

Please open a new PR that is rebased from v1.0.0 (current main) because there were significant changes made

These are the file you want to modify:

benathi commented 6 months ago

@clam004 This is a long script but can be cut down and adapted for a public dataset or instruction tuning dataset.


def tokenize_constant_length(data,
                  tokenizer,
                  max_length=32*1024,
                  add_special_tokens=True,
                  omit_attn_mask=False):
    """
    Tokenize the data using LLaMaTokenizer.

    Args:
    data (list): List of text strings to tokenize.
    model_name (str): Model name of the tokenizer.

    Returns:
    list of list of int: List of tokenized texts.
    """
    tokenized = tokenizer(data["text"], 
                          max_length=max_length,
                          truncation=True,
                          padding='max_length',
                          add_special_tokens=add_special_tokens,
                          )
    if omit_attn_mask:
        del tokenized['attention_mask']
    return tokenized

def data_to_parquet(tokenized_data, output_file='output.parquet'):
    """
    Convert tokenized data to a Parquet file.

    Args:
    tokenized_data (list of list of int): Tokenized data.
    output_file (str): Path to the output Parquet file.
    """
    # df = pd.DataFrame(tokenized_data)
    # table = pa.Table.from_pandas(df)
    # pq.write_table(table, output_file)
    print("Saving the following data to parquet")
    print(tokenized_data)

    # assert that the columns are 'input_ids' and 'attention_mask'

    # df = pd.DataFrame({key: pd.Series(value) for key, value in tokenized_data.items()})
    df = pd.DataFrame(tokenized_data)

    # Convert DataFrame to PyArrow Table
    table = pa.Table.from_pandas(df)

    # Write the table to a Parquet file
    pq.write_table(table, output_file)

def data_to_arrow(tokenized_data, output_file='output.arrow'):
    """
    Convert tokenized data to an Arrow file.

    Args:
    tokenized_data (dict): Tokenized data with keys 'input_ids' and 'attention_mask', 
                           each containing a list of list of int.
    output_file (str): Path to the output Arrow file.
    """
    # Create PyArrow Arrays for each piece of tokenized data
    input_ids_array = pa.array(tokenized_data['input_ids'])
    attention_mask_array = pa.array(tokenized_data['attention_mask'])
    """
    if add_labels:
        # check of labels are present
        if 'labels' not in tokenized_data:
            # clone 
            labels_array = tokenized_data['input_ids']
            # if there's any position that is equal to pad token, then use -100 as labels
            for i, ids in enumerate(tokenized_data['input_ids']):
                for j, id in enumerate(ids):
                    if id == 0:
                        labels_array[i][j] = -100
            labels_array = pa.array(labels_array)

    if add_labels:
        table = pa.Table.from_arrays([input_ids_array, attention_mask_array, labels_array], names=['input_ids', 'attention_mask', 'labels'])
    """
    # else:
    # Create a PyArrow Table from the Arrays
    table = pa.Table.from_arrays([input_ids_array, attention_mask_array], names=['input_ids', 'attention_mask'])

    # Serialize the table to an Arrow file
    with pa.OSFile(output_file, 'wb') as sink:
        with pa.RecordBatchFileWriter(sink, table.schema) as writer:
            writer.write_table(table)

def process_fast_packing(dataset, tokenizer, max_sequence_length,
                        shuffle=True,
                        add_special_tokens=True,
                        ):
    """
    (1) parallel tokenization
    (2) packing in groups
    """
    start = time.time()
    tokenized_dataset = dataset.map(
        lambda examples: tokenize_function(examples, tokenizer, add_special_tokens=add_special_tokens),
        batched=True,
        num_proc=160,
        load_from_cache_file=True,
        remove_columns=dataset.column_names
    )
    logger.info(f"********* Time taken for tokenization: {time.time() - start:.2f}s")
    logger.info(f"tokenized dataset: {tokenized_dataset}")
    # here we have input_ids and attention_mask

    start = time.time()
    packed_dataset = tokenized_dataset.map(
        lambda batch: pack_sequences(batch,
                                    max_sequence_length,
                                    tokenizer.pad_token_id,
                                    tokenizer.eos_token_id,
                                    shuffle=shuffle,
                                    cutoff_size=max_sequence_length,
                                    ),
        batched=True,
        # determine num processes on how much data we have
        num_proc=32 if len(tokenized_dataset) > 10000 else 1, 
        remove_columns=['attention_mask'] # input_ids will be replaced by packed sequences
    )

    logger.info(f"********* Time taken for packing: {time.time() - start:.2f}s")
    logger.info(f"packed dataset: {packed_dataset}")

    # adding attention mask -- for packing they are all 1
    logger.info(f"Adding attention mask")
    packed_dataset = packed_dataset.map(
        lambda batch: {'attention_mask': [[1] * len(ids) for ids in batch['input_ids']]},
        batched=True,
        num_proc=160,
    )
    logger.info(f"final dataset w attention: {packed_dataset}")
    return packed_dataset

def visualize_packed_sequences(packed_dataset, tokenizer, max_sequences=8):
    for i, example in enumerate(packed_dataset):
        if i == max_sequences:
            break
        packed_sequence = example['input_ids']
        decoded_tokens = tokenizer.convert_ids_to_tokens(packed_sequence)
        logger.info(f"Sequence {i+1}:\n{decoded_tokens[:20]}\n{'-'*40}")
        #if 'attention_mask' in example:
        attention_mask = example['attention_mask']
        logger.info(f"Attention Mask {i+1}:\n{attention_mask[:20]}\n{'-'*40}")
        if 'labels' in example:
            labels = example['labels']
            logger.info(f"Labels {i+1}:\n{labels[:20]}\n{'-'*40}")

def pack_sequences(batch, max_seq_len, pad_token_id, eos_token_id, shuffle=True, cutoff_size=0):
    """
    cutoff_size = max_seq_len means that we will drop any non-full sequences 
        (full packing without padding)
    Example:
        Sequence 1:
        ['<s>', '▁usually', '▁,', '▁he', '▁would', '▁be', '▁t', 'earing']
        Sequence 2:
        ['▁around', '▁the', '▁living', '▁room', '▁,', '▁playing', '▁with', '▁his']
        Sequence 3:
        ['▁toys', '▁.', '</s>', '<s>', '▁but', '▁just', '▁one', '▁look']
    """
    packed_sequences = []
    buffer = []

    for input_id in batch['input_ids']:
        # Add the current sequence to the buffer
        buffer.extend(input_id)
        buffer.append(eos_token_id)  # Add EOS at the end of each sequence

        # Check if buffer needs to be split into chunks
        while len(buffer) > max_seq_len:
            # Take a full chunk from the buffer and append it to packed_sequences
            packed_sequences.append(buffer[:max_seq_len])
            # Remove the processed chunk from the buffer
            buffer = buffer[max_seq_len:]

    # Add the last buffer if it's exactly chunk_size
    if len(buffer) == max_seq_len:
        packed_sequences.append(buffer)
    elif len(buffer) > cutoff_size:
        # if the buffer is larger than the cutoff size, pad it to the chunk_size
        # if not, we do not include in the packed_sequences
        buffer.extend([pad_token_id] * (max_seq_len - len(buffer)))
        packed_sequences.append(buffer)

    if shuffle:
        random.shuffle(packed_sequences)

    return {"input_ids": packed_sequences,
            "labels": packed_sequences}

def tokenize_function(examples, tokenizer, add_special_tokens=True):
    # the length will vary
    return tokenizer(examples["text"], 
                    add_special_tokens=add_special_tokens,
                    truncation=False
    )

def generate_labels(example, pad_token_id):
    if args.mock_labels:
        labels = [-100]*len(example["input_ids"])
        return {"labels": labels}
    else:
        labels = example["input_ids"][:]
        labels[labels == pad_token_id] = -100
        return {"labels": labels}

## save both tokenized data and text files
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--packing', type=int, default=0, help='Use packing')
parser.add_argument('--max_length', type=int, default=32*1024, help='Max length of sequence')
parser.add_argument('--use_arrow', type=int, default=0, help='Use arrow')
parser.add_argument('--add_labels', type=int, default=0, help='Add labels')
parser.add_argument('--save_processed', type=int, default=0, help='Save processed text files')
parser.add_argument('--bad_data', type=int, default=0, help='Dataset name')
parser.add_argument('--tokenizer', type=str, default='togethercomputer/StripedHyena-Nous-7B', help='Model name')
parser.add_argument('--mock_labels', type=int, help='Mock data -- use -100 for all labels')

args = parser.parse_args()

packing = args.packing
max_length = args.max_length
use_arrow = args.use_arrow
add_labels = args.add_labels

# ahxt/LiteLlama-460M-1T -- leads to different behavior
#model_name = 'togethercomputer/StripedHyena-Nous-7B'
# different tokenizer process things differently even though the vocab may be the same

tokenizer = AutoTokenizer.from_pretrained(args.tokenizer)
tokenizer.pad_token = tokenizer.eos_token

for dataset_file in datasets:
    files = os.listdir(os.path.join(base_path, dataset_file))
    for file in files:
        print(dataset_file + "  " + file)
        if not file.endswith(".jsonl"):
            print("Skipping")
        if not file.startswith("ft_examples"):
            print("Skipping")
            continue
        converted_examples = []
        with open(os.path.join(base_path, dataset_file, file), 'r') as f:
            for line in f:
                if line:
                    data = json.loads(line)
                    converted_examples.append(format_stripedhyena_instruct(data))

        processed_text_file = os.path.join(save_dir, "ft_ready", "processed", dataset_file + "_" + file)
        if args.save_processed:
            with jsonlines.open(processed_text_file, mode='w') as writer:
                writer.write_all(converted_examples)

        print("Loading processed text file")
        dataset = load_dataset('json', data_files=processed_text_file,
                                split='train')
        print("Loaded text dataset")
        print(dataset)
        print(f"Tokenizing with packing = {packing} and max_length = {max_length}")
        if not packing:
            tokenized_data = dataset.map(
                lambda examples: tokenize_constant_length(examples, tokenizer, max_length, add_special_tokens=True),
                batched=True,
                num_proc=160,
                load_from_cache_file=True,
                remove_columns=dataset.column_names
            )
        else:
            tokenized_data = process_fast_packing(dataset, tokenizer, max_sequence_length=max_length)
            # tokenized_data['attention_mask'] = [[1] * len(ids) for ids in tokenized_data['input_ids']]
            # the above does not work for datasets

        assert 'input_ids' in tokenized_data.column_names

        ## this performs replacement of existing 'labels'
        if add_labels:
            # labels = tokenized_data["input_ids"].clone()
            # above does not work for datasets
            tokenized_data = tokenized_data.map(
                lambda batch: generate_labels(batch, tokenizer.pad_token_id),
                batched=False,
                num_proc=160,
            )
            assert 'labels' in tokenized_data.column_names
        else:
            if 'labels' in tokenized_data.column_names:
                print("Removing labels column")
                tokenized_data = tokenized_data.remove_columns('labels')

        ### verify length
        for i, ids in enumerate(tokenized_data['input_ids']):
            assert len(ids) == max_length, f"Length of sequence {i} is {len(ids)}"
            if i > 10:
                break

        print("Tokenized data")
        print(tokenized_data)
        visualize_packed_sequences(tokenized_data, tokenizer, max_sequences=2)

        ############
        if use_arrow:
            extension = ".arrow"
        else:
            extension = ".parquet"
        basename, _ = os.path.splitext(file)
        mock = "" if not args.mock_labels else "_mocklabels"
        basename += f"_max_length{max_length}_packed{packing}_labels{add_labels}{mock}"
        tokenized_filename = os.path.join(save_dir, "ft_ready", "tokenized", dataset_file + "_" + basename + extension)

        print("Saving Tokenized Data to", tokenized_filename)
        if use_arrow:
            data_to_arrow(
                tokenized_data,
                output_file=tokenized_filename
            )
        else:
            data_to_parquet(
                tokenized_data,
                output_file=tokenized_filename
            )