vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.27k stars 590 forks source link

[BUG-REPORT] vaex save error #2396

Open shawn0wang opened 11 months ago

shawn0wang commented 11 months ago

When I use python multi-process and vaex, I want to save the text as embedding. Everything is normal in the early stage of the program running, but after a while, the saved hdf5 becomes like this:

image

everything is lost, here is my code:

`import gzip import hashlib import json import logging import os import warnings from multiprocessing import Pool

import numpy as np import vaex from sentence_transformers import SentenceTransformer

warnings.filterwarnings("ignore")

matching_files = ["x1.json.gz", "x2.json.gz", "x3.json.gz", ...]

print("TOTAL # JOBS:", len(matching_files)) print(matching_files)

def save_embedding(file_path): cuda_num = int(file_path.split(".")[0][-4:]) % 8 save_name = file_path.split("/")[-1].split(".")[0] save_path = "xxx"

# log
logging.basicConfig(filename=f"logs/{save_name}.log", level=logging.INFO,
                    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(f"{save_name}_logger")

print("file_path is: ", file_path, "device is: ", cuda_num, "save name is: ", save_name)
# process
model = SentenceTransformer("infgrad/stella-large-zh", device=f"cuda:{cuda_num}")
all_md5 = []
all_text = []
content_embedding = []
rdy_num = 0
total_num = 0
with gzip.open(file_path, "rt") as f:
    for line in f:
        total_num += 1
        json_data = json.loads(line.strip())
        content = json_data["content"]
        embedding = model.encode(
            sentences=content,
            batch_size=1,
            show_progress_bar=False,
            device=f"cuda:{cuda_num}",
            normalize_embeddings=True)
        text_md5 = hashlib.md5(content.encode(encoding="utf-8")).hexdigest()

        all_text.append(content)
        all_md5.append(text_md5)
        content_embedding.append(embedding)
        rdy_num += 1
        # save memory
        if total_num % 10000 == 0:
            print(f"{save_name}*** rdy_num: {rdy_num}, total_num: {total_num}")
            logger.info(
                f"rdy_num: {rdy_num}, total_num: {total_num}, finish: {rdy_num / total_num * 100}%")
            all_text = np.array(all_text)
            all_md5 = np.array(all_md5)
            content_embedding = np.array(content_embedding)
            df = vaex.from_arrays(text=all_text, md5=all_md5, content_embedding=content_embedding)
            if os.path.exists(save_path):
                old_df = vaex.open(save_path)
                new_df = vaex.concat([old_df, df], resolver="strict")
                # if not del old file, it only save new part, not all data, lost old part
                os.remove(save_path)
            else:
                new_df = df
            new_df.export_hdf5(save_path)
            all_md5 = []
            all_text = []
            content_embedding = []
# last part
if os.path.exists(save_path) and len(all_text) != 0 and len(all_md5) != 0:
    logger.info(
        f"rdy_num: {rdy_num}, total_num: {total_num}, finish: {rdy_num / total_num * 100}%")
    all_text = np.array(all_text)
    all_md5 = np.array(all_md5)
    content_embedding = np.array(content_embedding)
    df = vaex.from_arrays(text=all_text, md5=all_md5, content_embedding=content_embedding)
    old_df = vaex.open(save_path)
    new_df = vaex.concat([old_df, df], resolver="strict")
    os.remove(save_path)
    new_df.export_hdf5(save_path)

with Pool(8) as p: p.map(save_embedding, matching_files) `