argilla-io / distilabel

⚗️ distilabel is a framework for synthetic data and AI feedback for AI engineers that require high-quality outputs, full data ownership, and overall efficiency.
https://distilabel.argilla.io
Apache License 2.0
1.01k stars 65 forks source link

Questions about threads #550

Closed YueWu0301 closed 1 day ago

YueWu0301 commented 1 month ago

After I successfully ran the pipeline once, I can no longer reproduce my code, even if I changed my name, entered data and related parameters, and reported the following error. What may be the cause?

EOFError
Exception in thread Thread-1 (_monitor):
Traceback (most recent call last):
  File "/root/miniconda3/envs/datagen/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/root/miniconda3/envs/datagen/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/root/miniconda3/envs/datagen/lib/python3.10/logging/handlers.py", line 1556, in _monitor
    record = self.dequeue(True)
  File "/root/miniconda3/envs/datagen/lib/python3.10/logging/handlers.py", line 1505, in dequeue
    return self.queue.get(block)
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/queues.py", line 103, in get
    res = self._recv_bytes()
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 3 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
gabrielmbmb commented 1 month ago

Hi @YueWu0301, could you share the code of your pipeline?

tridungduong-unsw commented 1 month ago

Hi @gabrielmbmb, I face the same problem. The code is as follows:

import json
import os
import pdb

import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
    CombineColumns,
    KeepColumns,
    LoadDataFromDicts,
    LoadHubDataset,
    PreferenceToArgilla,
    TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv

load_dotenv()

def read_jsonl_file(file_path):
    """
    Reads a .jsonl file where each line is a separate JSON object, and returns a list of dictionaries.

    :param file_path: str - The path to the .jsonl file.
    :return: list - A list containing dictionaries, each representing a JSON object from the file.
    """
    data = []
    try:
        with open(file_path, "r") as file:
            for line in file:

                json_object = json.loads(line.strip())
                json_object["instruction"] = json_object.pop("question")
                json_object["generations"] = json_object.pop("answer")
                data.append(json_object)
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' does not exist.")
    except json.JSONDecodeError:
        print(f"Error: The file '{file_path}' contains invalid JSON.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    return data

llm = AzureOpenAILLM(
    model=os.getenv("api_engine_gpt4"),
    base_url=os.getenv("api_base_gpt4"),
    api_key=os.getenv("api_key_gpt4"),
    api_version=os.getenv("api_version"),
)

with Pipeline(name="ultrafeedback-pipeline") as pipeline:

    data = read_jsonl_file(
        "data.json"
    )

    load_hub_dataset = LoadDataFromDicts(
        name="load_data",
        data=data,
        batch_size=1,
    )

    ultrafeedback = UltraFeedback(
        name="ultrafeedback_overall_rating",
        llm=llm,
        aspect="overall-rating",
        output_mappings={"model_name": "ultrafeedback_model"},
    )

    load_hub_dataset.connect(ultrafeedback)

dataset = pipeline.run(
    parameters={
        "ultrafeedback_overall_rating": {
            "generation_kwargs": {
                "max_new_tokens": 1024,
                "temperature": 0.7,
            },
        },
    }
)
gabrielmbmb commented 1 month ago

Hi, I think the issue might be caused because the run method is not being called within an if __name__ == "__main__": block. Could you try to update your script and check if you still have the error?

tridungduong-unsw commented 1 month ago

Hi @gabrielmbmb, I'm currently trying to modify it as follows:

import json
import os
import pdb

import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
    CombineColumns,
    KeepColumns,
    LoadDataFromDicts,
    LoadHubDataset,
    PreferenceToArgilla,
    TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
import pandas as pd

load_dotenv()

def location_extraction(article):
    system_prompt = """
    You are an advanced Named Entity Recognition (NER) system specializing in disease-related information.
    Task: Identify geographical locations from a given list of entities.
    Instruction: 
    - Focus on identifying specific and recognized geographical locations in each paragraph.
    - LOCATION: Extract names of countries, cities, regions, and towns. Do not include vague or non-specific locations.
    - Present your findings for each entity in a clear, line-separated format. If an entity value includes a list or multiple components, separate these so that each item appears on its own line. 
    Example Output Format:
    - LOCATION: Mexico
    - LOCATION: Vietnam
    """
    prompt = f"""
    Article Content:
    ----------------
    {article}

    Analysis Task:
    --------------
    Please analyze the above article for the specified entities. If certain entities, like dates or locations, are not mentioned, indicate this by stating 'Not mentioned'. For example, 'DATE: Not mentioned'.
    """
    return system_prompt + prompt

llm = AzureOpenAILLM(
    model=os.getenv("api_engine_gpt4"),
    base_url=os.getenv("api_base_gpt4"),
    api_key=os.getenv("api_key_gpt4"),
    api_version=os.getenv("api_version"),
)

with Pipeline(name="ultrafeedback-pipeline") as pipeline:
    df=pd.read_csv('/g/data/ue03/duongd/ews-nlp-llm-inference/dataset/ner/collected/latest_ner.csv')
    df['instruction'] = [location_extraction(x) for x in df['summary']]
    df=df[['instruction', 'locations']]
    df=df.rename(columns={'locations':'generations'})
    df=df.loc[:3, :]
    data = df.to_dict(orient='records')
    load_hub_dataset = LoadDataFromDicts(
        name="load_data",
        data=data,
        batch_size=1,
    )
    ultrafeedback = UltraFeedback(
        name="ultrafeedback_overall_rating",
        llm=llm,
        aspect="overall-rating",
        output_mappings={"model_name": "ultrafeedback_model"},
    )
    load_hub_dataset.connect(ultrafeedback)

if __name__ == "__main__":
    dataset = pipeline.run(
        parameters={
            "ultrafeedback_overall_rating": {
                "generation_kwargs": {
                    "max_new_tokens": 1024,
                    "temperature": 0.7,
                },
            },
        }
    )

The errors still occurred:

Screenshot 2024-04-21 at 5 15 09 pm
gabrielmbmb commented 1 month ago

Hi @tridungduong-unsw, thanks for the details! I'll try to reproduce the error and get back to you. You are using conda, right?

tridungduong-unsw commented 1 month ago

Hi @gabrielmbmb, yes, I'm using conda env. btw, I make it run now but need to modify a little bit. Other people will the same problem can try:

import json
import os
import pdb

import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
    CombineColumns,
    KeepColumns,
    LoadDataFromDicts,
    LoadHubDataset,
    PreferenceToArgilla,
    TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
import pandas as pd

load_dotenv()

def location_extraction(article):
    return system_prompt + prompt

llm = AzureOpenAILLM(
    model=os.getenv("api_engine_gpt4"),
    base_url=os.getenv("api_base_gpt4"),
    api_key=os.getenv("api_key_gpt4"),
    api_version=os.getenv("api_version"),
)

with Pipeline(name="ultrafeedback-pipeline") as pipeline:
    df=pd.read_csv('data.csv')
    df['instruction'] = [location_extraction(x) for x in df['summary']]
    df=df[['instruction', 'locations']]
    df=df.rename(columns={'locations':'generations'})
    df=df.loc[:3, :]
    data = df.to_dict(orient='records')
    load_hub_dataset = LoadDataFromDicts(
        name="load_data",
        data=data,
        batch_size=1,
    )
    ultrafeedback = UltraFeedback(
        name="ultrafeedback_overall_rating",
        llm=llm,
        aspect="overall-rating",
        output_mappings={"model_name": "ultrafeedback_model"},
    )
    load_hub_dataset.connect(ultrafeedback)

if __name__ == "__main__":
    dataset = pipeline.run(
        parameters={
            "ultrafeedback_overall_rating": {
                "generation_kwargs": {
                    "max_new_tokens": 1024,
                    "temperature": 0.7,
                },
            },
        }
    )
YueWu0301 commented 1 month ago

Hi @YueWu0301, could you share the code of your pipeline?

sure,here is my code:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        repo_id="xxxx",
        name="load_dataset2"
        # output_mappings={"input": "instruction"},
    )
    push_to_hub = PushToHub(
    name="push_to_hub1",
    repo_id="xxxx",
    token="xxxxxx"
    )
    llm1 = OpenAILLM(model="xxxx",
                     api_key = "xxxx",
                     base_url="xxxx")
    task = TextGeneration(name=f"text_generation1", llm=llm1)
    load_dataset.connect(task)
    task.connect(push_to_hub)

re = pipeline.run(
        parameters={
        "load_dataset2":{
            "repo_id":"xxxxxx",
        },

        "text_generation1": {
            "llm": {
                "generation_kwargs": {
                    "temperature": 0.9,
                    }
                }
            },

        "push_to_hub1":{
                "repo_id":"xxxxxxx", 
        }
        }
)

Thanks a lot

gabrielmbmb commented 1 month ago

Hi @YueWu0301, could you try running and see if it works for you too? (mind the if __name__ == "__main__":)


with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        repo_id="xxxx",
        name="load_dataset2"
        # output_mappings={"input": "instruction"},
    )
    push_to_hub = PushToHub(
    name="push_to_hub1",
    repo_id="xxxx",
    token="xxxxxx"
    )
    llm1 = OpenAILLM(model="xxxx",
                     api_key = "xxxx",
                     base_url="xxxx")
    task = TextGeneration(name=f"text_generation1", llm=llm1)
    load_dataset.connect(task)
    task.connect(push_to_hub)

if __name__ == "__main__":
    re = pipeline.run(
            parameters={
            "load_dataset2":{
                "repo_id":"xxxxxx",
            },

            "text_generation1": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.9,
                        }
                    }
                },

            "push_to_hub1":{
                    "repo_id":"xxxxxxx", 
            }
            }
    )