usc-isi-i2 / kgtk

Knowledge Graph Toolkit
https://kgtk.readthedocs.io/en/latest/
MIT License
354 stars 57 forks source link

Error in text-embedding multiprocessing #615

Open saggu opened 2 years ago

saggu commented 2 years ago

The command:

kgtk --debug text-embedding -i sample.tsv --model roberta-large-nli-mean-tokens \
--property-labels-file labels.en.tsv.gz \
--isa-properties P31 P279 P106 P39 P1382 P373 P452 \
--save-embedding-sentence -o wikidatadwd-text-embeddings-sample.tsv.gz \
--parallel 8

Error:

  File "/data/amandeep/github/kgtk/kgtk/cli/text_embedding.py", line 249, in main
    process.read_input(input_file_path=input_file_path,
  File "/data/amandeep/github/kgtk/kgtk/gt/embedding_utils.py", line 528, in read_input
    pp.task_done()
  File "/nas/home/amandeep/miniconda3/envs/kgtk-env-ckg08/lib/python3.9/site-packages/pyrallel/parallel_processor.py", line 367, in task_done
    self.mapper_queues[i].put((ParallelProcessor.CMD_STOP,))
  File "/nas/home/amandeep/miniconda3/envs/kgtk-env-ckg08/lib/python3.9/site-packages/multiprocess/queues.py", line 91, in put
    raise ValueError(f"Queue {self!r} is closed")
ValueError: Queue <multiprocess.queues.Queue object at 0x7fcb04f44e20> is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/data/amandeep/github/kgtk/kgtk/exceptions.py", line 70, in __call__
    return_code = func(*args, **kwargs) or 0
  File "/data/amandeep/github/kgtk/kgtk/cli/text_embedding.py", line 398, in run
    main(**kwargs)
  File "/data/amandeep/github/kgtk/kgtk/cli/text_embedding.py", line 268, in main
    raise KGTKException(str(e))
kgtk.exceptions.KGTKException: Queue <multiprocess.queues.Queue object at 0x7fcb04f44e20> is closed
Queue <multiprocess.queues.Queue object at 0x7fcb04f44e20> is closed
sys:1: ResourceWarning: unclosed file <_io.TextIOWrapper name='sample.tsv' mode='r' encoding='UTF-8'>
GreatYYX commented 2 years ago

On the Pyrallel's end, if task_done() and join() are called, no more new data should be added (by calling add_task()).

pp = ParallelProcessor(...)
pp.start()
for i in range(100):
    pp.add_task(i)

    # this makes the exception
    if i == 20:
        pp.task_done()
        pp.join()

pp.task_done()
pp.join()

Running the above code throws:

Traceback (most recent call last):
  File "t1.py", line 53, in <module>
    pp.task_done()
  File ".../pyrallel/pyrallel/parallel_processor.py", line 413, in task_done
    self.mapper_queues[i].put((ParallelProcessor.CMD_STOP,))
  File ".../envs/py38/lib/python3.8/site-packages/multiprocess/queues.py", line 85, in put
    raise ValueError(f"Queue {self!r} is closed")
ValueError: Queue <multiprocess.queues.Queue object at 0x101fc45b0> is closed