Generative AI

Curating Custom Datasets for LLM Training with NVIDIA NeMo Curator

Data curation is the first, and arguably the most important, step in the pretraining and continuous training of large language models (LLMs) and small language models (SLMs). NVIDIA recently announced the open-source release of NVIDIA NeMo Curator, a data curation framework that prepares large-scale, high-quality datasets for pretraining generative AI models. 

NeMo Curator, which is part of NVIDIA NeMo, offers workflows to download and curate data from various public sources out of the box such as Common Crawl, Wikipedia, and arXiv. It also provides flexibility for developers to customize data curation pipelines to address their unique requirements and create custom datasets. 

This post walks you through creating a custom data curation pipeline using NeMo Curator. Doing so enables you to:

  • Tailor data curation and customize the pipeline to fit the specific needs of your generative AI project.
  • Ensure data quality by applying rigorous filters and deduplication to train your model with the best possible dataset.
  • Protect privacy by identifying and removing personally identifiable information (PII) and adhere to data protection regulations.
  • Streamline the development by automating the curation process, saving time and resources to allow you to focus on solving your business-specific problems.

Overview

This tutorial focuses on creating a simple data curation pipeline that can download, process, and filter the TinyStories dataset. TinyStories is a dataset of around 2.2 million short stories generated by GPT-3.5 and GPT-4, featuring English words that are understood by 3- to 4-year olds. It is publicly available on Hugging Face. To learn more about the dataset, see TinyStories: How Small Can Language Models Be and Still Speak Coherent English?

The small size of this dataset makes it ideal for creating and validating data curation pipelines on a local machine. The dataset is split into training and validation files. This tutorial primarily uses the validation file, which contains about 22,000 records.

Defining the data curation pipeline involves the following high-level steps:

  1. Defining custom document builders that can:
    • Download the dataset from the web and convert to the JSONL format.
    • Iterate through the dataset and extract each document.
  2. Define custom modifiers to clean and unify the text data.
  3. Filter the dataset using predefined, as well as user-defined heuristics.
  4. Deduplicate the dataset and remove identical records.
  5. Redact all personally identifiable information (PII) from the dataset.
  6. Output the results into the JSONL format.

The execution of this curation pipeline should take less than 5 minutes on consumer-grade hardware, and the curated dataset should have about 21,500 records after curation. To access the complete code for this tutorial, visit NVIDIA/NeMo-Curator on GitHub. 

Prerequisite

Before starting, the NeMo Curator framework must be installed. Follow the instructions in the project’s NeMo Curator GitHub README file to install the framework. After that, run the following commands from the terminal to verify the installation. Also install additional dependencies needed for following along.

$ python -c "import nemo_curator; print(nemo_curator);"
$ pip3 install requests

Defining custom document builders

To support working with arbitrary datasets, NeMo Curator provides a set of document builders that abstract away the representation of the underlying dataset, including:

  • DocumentDownloader: an abstract class for downloading remote data to disk.
  • DocumentIterator: an abstract class for reading dataset raw records from the disk.
  • DocumentExtractor: an abstract class for extracting text records, as well as any relevant metadata from the records on the disk.

Several implementations for these to work with datasets such as CommonCrawl, Wikipedia, and arXiv are available on the NVIDIA/NeMo-Curator GitHub repo. The following sections show how to implement each of these abstract classes to customize the work with the TinyStories dataset.

Downloading the TinyStories dataset

First, implement the DocumentDownloader class, which takes the URL of the dataset’s validation split and downloads it using the requests library. 

import requests
from nemo_curator.download.doc_builder import DocumentDownloader

class TinyStoriesDownloader(DocumentDownloader):
    def __init__(self, download_dir: str):
        super().__init__()

        if not os.path.isdir(download_dir):
            os.makedirs(download_dir)

        self._download_dir = download_dir
        print("Download directory: ", self._download_dir)

    def download(self, url: str) -> str:
        filename = os.path.basename(url)
        output_file = os.path.join(self._download_dir, filename)

        if os.path.exists(output_file):
            print(f"File '{output_file}' already exists, skipping download.")
            return output_file

        print(f"Downloading TinyStories dataset from '{url}'...")
        response = requests.get(url)

        with open(output_file, "wb") as file:
            file.write(response.content)

        return output_file

Next, download the actual dataset using the following code:

# Download the TinyStories dataset.
downloader = TinyStoriesDownloader("/path/to/download/")
tinystories_fp = downloader.download(TINY_STORIES_URL)
write_jsonl(tinystories_fp, jsonl_dir)

The dataset will download as a plain text file. To parse this dataset, implement the DocumentIterator and DocumentExtractor classes. This will enable you to convert it to the JSONL format (one of the formats that NeMo Curator supports). 

Iterating and extracting text from the dataset

In the downloaded file, each record (or story) spans several lines, and records are separated by the <|endoftext|> token. The DocumentIterator class defines an iterate function that takes the path to the file that is to be iterated and yields each record for that file, in the form of the raw text from the record and (optionally) any relevant metadata for that record. Although adding metadata to each record is not mandatory, some data processing algorithms (such as deduplication) rely on such data to uniquely identify each document and correctly perform their intended function.

Next, implement the iterator for the TinyStories dataset. Given that each story can span several lines, define the iterator function such that it would keep reading (and storing) each line in the file, until it reaches the separator token. 

Once a separator is reached, concatenate all the lines seen so far, tack on some metadata to the record, and yield the result. To ensure records are uniquely identifiable, use the dataset’s filename, as well as an internal counter to create the unique id and (optionally) filename metadata included with each record:

from nemo_curator.download.doc_builder import DocumentIterator

class TinyStoriesIterator(DocumentIterator):
    SEPARATOR_TOKEN = "<|endoftext|>"

    def __init__(self):
        super().__init__()
        self._counter = -1

    def iterate(self, file_path):
        self._counter = -1
        file_name = os.path.basename(file_path)

        with open(file_path, "r") as file:
            example = []

            def split_meta(example):
                if example:
                    self._counter += 1
                    content = " ".join(example)
                    meta = {
                        "filename": file_name,
                        "id": f"{file_name}-{self._counter}",
                    }

                    return meta, content

            for line in file:
                if line.strip() == TinyStoriesIterator.SEPARATOR_TOKEN:
                    if example:
                        yield split_meta(example)
                        example = []
                else:
                    example.append(line.strip())

            if example:
                yield split_meta(example)

The last remaining document builder to implement is the DocumentExtractor class, which simply returns the text for each record. Note that you may optionally associate some metadata for the extracted text, but the usage of this metadata is beyond the scope of this tutorial.

from nemo_curator.download.doc_builder import DocumentExtractor

class TinyStoriesExtractor(DocumentExtractor):
    def extract(self, content: str) -> Tuple[Set, str]:
        # No metadata for the text, just the content.
        return {}, content

Writing the dataset to the JSONL format

NeMo Curator provides helpers that can load datasets from the disk in JSONL, Parquet, or Pickle formats. Given the popularity of the JSONL format, this section demonstrates the conversion of the raw text dataset to this format using the iterator and extractor classes previously implemented.

To convert the dataset to JSONL, simply point the TinyStoriesIterator instance to the downloaded plain text file, iterate through each record, and extract entries using the TinyStoriesExtractor instance. Create a JSON object from each record (story) and write it to a single line in an output file. This procedure is straightforward:

import os
import json

def write_jsonl(input_filename: str, output_dir: str, dump_every_n: int = 10000):
    basename = os.path.basename(input_filename)
    iterator = TinyStoriesIterator()
    extractor = TinyStoriesExtractor()
    to_dump = []
    dump_ctr = 0

    def dump_to_file(to_dump, dump_ctr):
        """Helper function to facilitate dumping to file."""
        output_filename = f"{basename}-{dump_ctr}.jsonl"
        with open(os.path.join(output_dir, output_filename), "w") as output_file:
            output_file.writelines(to_dump)
        # Empty out the list and increment the counter.
        return [], dump_ctr + 1

    for item in iterator.iterate(input_filename):
        record_meta, content = item
        extracted = extractor.extract(content)

        if extracted is None:
            continue

        text_meta, text = extracted

        if text is None:
            continue

        line = {
            "text": text,
            **text_meta,
            **record_meta,
        }
        json_out = json.dumps(line, ensure_ascii=False)
        to_dump.append(json_out + "\n")

        # Should we dump what we have so far?
        if len(to_dump) == dump_every_n:
            to_dump, dump_ctr = dump_to_file(to_dump, dump_ctr)

    # Dump the remaining records.
    if to_dump:
        dump_to_file(to_dump, dump_ctr)

Note that by default, this function creates one JSONL file for every 10,000 records. While entirely optional, this is to ensure that each output file remains small enough for easy manual inspection using a text editor, without consuming too much memory.

Also note that the content of each story is written into the text field of each JSON object. Many data curation operations throughout NeMo Curator need to know which field inside each record contains the text data for that record. If not explicitly specified, these operations assume the existence of a text field in the dataset. As such, it is often good practice to always populate the text field for each record with the text data of interest.

Loading the dataset using the document builders

In NeMo Curator, datasets are represented as objects of type DocumentDataset. This provides helpers to load the datasets from disk in various formats. Having created the dataset in the JSONL format, you can use the following code to load it and start working with it:

from nemo_curator.datasets import DocumentDataset
# define `files` to be a list of all the JSONL files to load
dataset = DocumentDataset.read_json(files, add_filename=True)

You now have everything needed to define a custom dataset curation pipeline and prepare your data for training (or validation) use cases.

Text cleaning and unification

A fundamental operation in data curation pipelines involving text data is text unification and cleaning, as text scraped from online sources may contain inconsistencies or unicode issues. To modify documents, NeMo Curator provides a DocumentModifier interface, which defines how a given text from each document should be modified. The actual modification is done through the Modify helper, which takes a DocumentDataset object along with a DocumentModifier object and applies the modifier to the dataset.

The TinyStories dataset has inconsistent quotation marks, where some quotation marks are curly, while others are straight. Such inconsistencies (poor quality tokens, for example) may cause problems for models that are trained on this data. 

To resolve these, create a DocumentModifier that unifies all single- and double-quotation marks in the documents by replacing all the curly quotation marks with their straight variants:

from nemo_curator.modifiers import DocumentModifier

class QuotationUnifier(DocumentModifier):
    def modify_document(self, text: str) -> str:
        text = text.replace("‘", "'").replace("’", "'")
        text = text.replace("“", '"').replace("”", '"')
        return text

NeMo Curator provides various DocumentModifier implementations out of the box. One such modifier is UnicodeReformatter, which uses ftfy to resolve all unicode issues in the dataset. Next, chain these modifiers together and clean the downloaded dataset. The chaining operation is done through the Sequential class, which takes a list of operations that are to be sequentially performed and applies them to a given DocumentDataset instance:

from nemo_curator import Sequential
from nemo_curator.modules.modify import Modify
from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter

def clean_and_unify(dataset: DocumentDataset) -> DocumentDataset:
    cleaners = Sequential(
        [
            # Unify all the quotation marks
            Modify(QuotationUnifier()),
            # Unify all unicode
            Modify(UnicodeReformatter()),
        ]
    )
    return cleaners(dataset)

Dataset filtering

Another important step in the dataset curation process is data filtering, where some documents that do not fit certain criteria are discarded. For instance, you might want to discard documents that are too short, too long, or incomplete. At the time of writing, NeMo Curator provides 24 heuristics for natural languages, as well as eight heuristics for coding languages. 

NeMo Curator provides a DocumentFilter interface, which defines a way to score documents based on various criteria, along with a ScoreFilter helper to filter the documents. The ScoreFilter helper takes a DocumentDataset along with a DocumentFilter and determines whether each document in the dataset passes the filtering criteria.

Create a simple DocumentFilter that determines whether a story ends with an end of sentence character. The goal is to discard all stories that do not end with an end of sentence character:

from nemo_curator.filters import DocumentFilter

class IncompleteStoryFilter(DocumentFilter):
    def __init__(self):
        super().__init__()
        self._story_terminators = {".", "!", "?", '"', "”"}

    def score_document(self, text: str) -> bool:
        ret = text.strip()[-1] in self._story_terminators
        return ret

    def keep_document(self, score) -> bool:
        return score

The main functionality is implemented in score_document and keep_document functions, where False (that is, don’t keep this document) is returned if the document does not end with an end of sentence character.

To apply this filter to the dataset, pass an instance of IncompleteStoryFilter to a ScoreFilter object. NeMo Curator provides many DocumentFilter implementations out of the box. These filters can be chained together through the Sequential class. The following code shows how to apply various filters to the dataset:

def filter_dataset(dataset: DocumentDataset) -> DocumentDataset:
    filters = Sequential(
        [
            ScoreFilter(
                WordCountFilter(min_words=80),
                text_field="text",
                score_field="word_count",
            ),
            ScoreFilter(IncompleteStoryFilter(), text_field="text"),
            ScoreFilter(
                RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2),
                text_field="text",
            ),
            ScoreFilter(
                RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18),
                text_field="text",
            ),
            ScoreFilter(
                RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16),
                text_field="text",
            ),
        ]
    )
    return filters(dataset)

This code filters all short (less than 80 words) or incomplete stories, along with any other stories that have certain ratios of repeating n-grams. Note the usage of text_field=”text”, which tells the ScoreFilter to pass the contents of the dataset text column to each filtering criteria.

Deduplication

When working with large amounts of text data, there may be records that are identical (or near-identical) to each other. Training on such data may incur additional compute and storage overhead. NeMo Curator provides functionality to find and discard such duplicates. For simplicity, focus on finding exact duplicate records in the dataset. This can be accomplished using the ExactDuplicates class, as shown below. 

This module will automatically leverage existing CUDA devices and the GPU-accelerated implementations from the RAPIDS cuDF library to identify duplicate documents, resulting in much faster processing times. This is because the deduplication stage involves calculating a hash for every document, which is compute-intensive. Each document can be hashed independently, which makes this workload ideal to run in parallel on the GPU.

from nemo_curator.modules import ExactDuplicates

def dedupe(dataset: DocumentDataset) -> DocumentDataset:
    deduplicator = ExactDuplicates(id_field="id", text_field="text", hash_method="md5")
    # Find the duplicates
    duplicates = deduplicator(dataset)
    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x._hashes.duplicated(keep="first")]
    )
    # Remove the duplicates using their IDs.
    duplicate_ids = list(docs_to_remove.compute().id)
    dataset_df = dataset.df
    deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
    return DocumentDataset(deduped)

This specifies that each record’s unique identifier and content are in the id and text columns, respectively. Recall that a unique identifier was assigned to each document during the download and extraction phase. This enables the deduplicator to uniquely identify documents from one another. The deduplicator object returns a set of IDs that it has determined to be duplicates. Simply remove these documents from the dataset.

PII redaction

The last processing step discussed in this tutorial is the redaction of personally identifiable information (PII). NeMo Curator facilitates the detection and removal of PII using the PiiModifier class, which is an implementation of the DocumentModifier class previously discussed. This modifier leverages the Presidio framework and enables you to specify which PII to detect, what action to take for each detection, and process the data in batches to accelerate the operation.

The stories in the TinyStories dataset contain many instances of first names. This example intends to detect all such names and replace them with an anonymized token. This can be accomplished using a few lines of code:

from nemo_curator.modifiers.pii_modifier import PiiModifier

def redact_pii(dataset: DocumentDataset) -> DocumentDataset:
    redactor = Modify(
        PiiModifier(
            supported_entities=["PERSON"],
            anonymize_action="replace",
            device="cpu",
        ),
    )
    return redactor(dataset)

The operation takes the entire dataset and returns the modified dataset.

Putting the curation pipeline together

Having implemented each step of the curation pipeline, it’s time to put everything together and sequentially apply each operation on the dataset. You can use the Sequential class to chain curation operations together:

curation_steps = Sequential(
    [
        clean_and_unify,
        filter_dataset,
        dedupe,
        redact_pii,
    ]
)
dataset = curation_steps(dataset)
print("Executing the pipeline...")
dataset = dataset.persist()
dataset.to_json("/output/path", write_to_filename=True)

Under the hood, NeMo Curator uses Dask to work with the dataset in a distributed manner. Since Dask operations are lazy-evaluated, it’s necessary to call the .persist function to instruct Dask to apply the operations. Once processing finishes, you can write the dataset to disk in the JSONL format by calling the .to_json function and providing an output path.

Next steps

NeMo Curator supports many advanced data processing and filtering techniques, such as fuzzy or task-based deduplication, task identification and decontamination, domain classification (and much more) that are not covered in this tutorial. Check out the collection of data curation examples on GitHub to learn more. 

You can also request access to the NVIDIA NeMo Curator microservice, which provides the easiest path for enterprises to get started with data curation from anywhere. It offers streamlined performance and scalability to shorten the time to market.

Discuss (0)

Tags