opendatalab / MinerU

A one-stop, open-source, high-quality data extraction tool, supports PDF/webpage/e-book extraction.一站式开源高质量数据提取工具,支持PDF/网页/多格式电子书提取。
https://opendatalab.com/OpenSourceTools
GNU Affero General Public License v3.0
13.62k stars 1.03k forks source link

推理加速 #524

Open albin3 opened 2 months ago

albin3 commented 2 months ago

Is your feature request related to a problem? Please describe. 您的特性请求是否与某个问题相关?请描述。 A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

推理过程中,目前是先对pdf进行预处理再放入显存中;

是否可以开多进程,部分pdf负责处理。

Describe the solution you'd like 描述您期望的解决方案 A clear and concise description of what you want to happen.

有没有可能把pdf预处理和gpu的工作分解开,让cpu和gpu同时工作以提高解析效率?

drunkpig commented 2 months ago

make pdf index

pdf indexes looks likes this:

{
  "track_id": "afeda417-5a33-4ec8-bd79-56222763f832",
  "path": "s3://mybook/pdf/book-name.pdf",
  "file_type": "pdf",
  "title": "My book Name",
}

batch inference

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', type=str,default='s3://pdf/books/processing/test/index/part-6662d7694c33-000034.jsonl')
    parser.add_argument('--output', type=str, default='s3://pdf/books/processing/test/output/part-6662d7694c33-000034.jsonl')
    parser.add_argument("--tempdir", default="/mnt/project/pdf")
    parser.add_argument("--node_name", default="default-node-name")
    parser.add_argument("--job_name", default="pdf")
    parser.add_argument("--gpu_id", default="0")

    args = parser.parse_args()
    gpu_id = args.gpu_id
    node_name = args.node_name
    job_name = args.job_name
    pdf_processing_temp_dir = args.tempdir # gpu每推理一本pdf,就先把结果写到这个地方,处理好一个jsonl最后再一次性写到s3上。命名使用track_id
    log_format ="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}"
    logger.add(f'task-logs/logs/{node_name}#{gpu_id}#{job_name}.log',  format=log_format, rotation='10 MB', retention=5)

    os.environ['YOLO_VERBOSE'] = 'False'

    json_file_name = Path(args.input).name
    temp_json_save_path = os.path.join(pdf_processing_temp_dir, json_file_name)
    os.makedirs(temp_json_save_path, exist_ok=True)
    src_jsonl = args.input
    output_jsonl = args.output

    all_input_jsonl_lines = read_s3_jsonl(src_jsonl)
    total_pdfs = len(all_input_jsonl_lines)
    start_time = datetime.now()
    total_pdf_pages = 0
    for ii, pdf_info in enumerate(all_input_jsonl_lines): # 逐个pdf推理
        track_id = pdf_info['track_id']
        temp_json_save_file = os.path.join(temp_json_save_path, f"{track_id}.json") # 一本书临时保存到本地的json文件
        # 检查本地是否已经存在了,存在就跳过,+——类似断点续传
        if os.path.exists(temp_json_save_file):
            logger.info(f"{temp_json_save_file} already exists, skip.")
            continue 

        s3_pdf_path = pdf_info['path']   
        s3_pdf_client = get_s3_cli_from_pool(s3_pdf_path)

        #  读取pdf文件到内存里
        pdf_bytes = get_pdf_bytes(s3_pdf_path, s3_pdf_client)
        if pdf_bytes is None:
            logger.error(f"Failed to download {s3_pdf_path}")
            err_info = f"Failed to download {s3_pdf_path}"
            __set_extra_info(pdf_info, "__error", err_info)
        else:
            magicpdf = UNIPipe(pdf_bytes, {"_pdf_type":"", "model_list":[]}, image_writer=None)
            # fitz 获取页码数
            try:
                doc = fitz.open(stream=pdf_bytes, filetype="pdf")
                page_count = doc.page_count
                total_pdf_pages += page_count
                doc.close()
                magicpdf.pipe_classify()
                magicpdf.pipe_analyze()
                doc_layout_result = magicpdf.model_list
                pdf_info["doc_layout_result"] = doc_layout_result
            except FzErrorFormat as e:
                logger.error(f"FzErrorFormat >> Failed to open {s3_pdf_path}")
                logger.exception(e)
                err_info = f"FzErrorFormat: Failed to open {s3_pdf_path}, {e}"
                __set_extra_info(pdf_info, "__error", err_info)
            except Exception as e:
                logger.exception(e)
                err_info = str(e)
                __set_extra_info(pdf_info, "__error", err_info)

        __set_extra_info(pdf_info, "__inference_datetime", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        __set_extra_info(pdf_info, "__mineru_inference_version", magic_pdf_version.__version__)

        #outputs.append(pdf_info)
        logger.info(f"processed {ii}/{total_pdfs} pdfs")
        end_time = datetime.now()
        time_wasted_seconds = (end_time-start_time).seconds or 0.001
        avg_speed = total_pdf_pages/time_wasted_seconds
        logger.info(f"process speed is {avg_speed} pages/seconds, totalpage={total_pdf_pages}, timewasted={time_wasted_seconds}")

        ###################################################
        ## 保存这个pdf的结果到本地文件里,等整个json在每块GPU上都处理完全,之后一次上传到ceph
        ###################################################
        with open(temp_json_save_file,'w') as ff:
            ff.write(json.dumps(pdf_info, ensure_ascii=False))
    ######################################
    # 计算一下处理页面的速度
    ######################################
    end_time = datetime.now()
    time_wasted_seconds = (end_time-start_time).seconds or 0.001
    avg_speed = total_pdf_pages/time_wasted_seconds
    logger.error(f"process speed is {avg_speed} pages/seconds, totalpage={total_pdf_pages}, timewasted={time_wasted_seconds}")

    ######################################
    #
    #  这里退出了for循环,检查一下这个jsonl里是否所有的pdf都处理完了
    ######################################
    # 扫描下临时目录里有多少个track_id.json
    all_json_files = Path(temp_json_save_path).glob("*.json")
    all_json_files_dict = {f.name:f.absolute() for f in all_json_files}
    if total_pdfs==len(all_json_files_dict):
        logger.info(f"{src_jsonl}所有书处理完毕,开始合并")
        # 说明所有的pdf都处理完了,此时根据track_id在all_data中出现的顺序拼接成一个jsonl,然后上传到ceph上
        track_ids = [d['track_id'] for d in all_input_jsonl_lines]
        temp_merged_jsonl = os.path.join(temp_json_save_path, Path(output_jsonl).name, )
        # print(temp_merged_jsonl)
        with open(temp_merged_jsonl, 'w', encoding='utf-8') as wf:
            for track_id in track_ids:
                if f"{track_id}.json" not in all_json_files_dict:
                    logger.info(f"track_id: {track_id} not in all_json_files_dict")
                    continue
                with open(all_json_files_dict[f"{track_id}.json"], 'r', encoding='utf-8') as f:
                    wf.write(f.read().strip())
                    wf.write("\n")
        # 上传到ceph上
        s3_save_cfg = get_s3_cfg_by_bucket(args.output)
        s3_save_client = get_s3_client('', s3_save_cfg)
        logger.info(f"开始上传 {temp_merged_jsonl} ==> {args.output}")
        is_upload_ok = False
        with open(temp_merged_jsonl, 'rb') as ff:
            try:
                write_s3_object_content_v2(s3_save_client, args.output, ff.read())
                logger.info(f"上传完成 {temp_merged_jsonl} => {args.output}")
                is_upload_ok = True
            except Exception as e:
                logger.exception(e)
                logger.error(f"upload {temp_merged_jsonl} failed: {e}")
        if is_upload_ok:
            retry_rmtree(temp_json_save_path)

    else:
        logger.error(f"还有书没有处理完毕, 预期{len(all_input_jsonl_lines)}, 实际处理{len(all_json_files_dict)}")

postprocess by minerU


from magic_pdf.pipe.UNIPipe import UNIPipe
import json

from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter

img_save_dir = "/mnt/data/test/"

with open("/root/project/doc-pipeline/test/test.pdf", 'rb') as f:
    pdf_bytes = f.read()

with open("/root/project/doc-pipeline/test/a.json", 'r') as f:
    pdf_info = json.load(f)
    model_list = pdf_info['doc_layout_result']
    image_writer = DiskReaderWriter(img_save_dir)
    pip = UNIPipe(pdf_bytes, {"_pdf_type":"", "model_list":model_list}, image_writer=image_writer)
    pip.pipe_classify()
    pip.pipe_parse()
    md = pip.pipe_mk_markdown(img_save_dir)
    print(md)