MontrealCorpusTools / Montreal-Forced-Aligner

Command line utility for forced alignment using Kaldi
https://montrealcorpustools.github.io/Montreal-Forced-Aligner/
MIT License
1.26k stars 242 forks source link

TextGrid export step is very slow #807

Closed leandro-gracia-gil closed 1 month ago

leandro-gracia-gil commented 1 month ago

Debugging checklist

[X] Have you read the troubleshooting page (https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/troubleshooting.html) and searched the documentation to ensure that your issue is not addressed there? [X] Have you updated to latest MFA version (check https://montreal-forced-aligner.readthedocs.io/en/latest/changelog/changelog_3.0.html)? What is the output of mfa version? [X] Have you tried rerunning the command with the --clean flag?

Describe the issue I am running mfa align on a series of datasets containing about 500k utterances using 64 cores in parallel. While doing so, some steps like alignment passes become very slow to the point of not being able to estimate it/s and suddenly increasing by 5000 in one go every 20~30 minutes.

Apart from that, TextGrid exporting is also surprisingly slow, reaching 0\~1 it/s. For example, in one of my datasets this step alone has been running for 2 days and 8 hours, and it is estimated to take 9\~10 days more! In another dataset of size 1.7M utterances the approximate it/s rounds to zero.

Also, I'm not sure if this helps, but I have noticed similar issues with mfa g2p. In this case, some runs became completely bottlenecked, down to not being able to estimate it/s and expecting multiple days of work, then sometimes suddenly the it/s would spike to multiple thousands and finish immediately.

Sounds like some kind of bottleneck, possibly related to multiprocessing or database access.

For Reproducing your issue Please fill out the following:

  1. Corpus structure
    • What language is the corpus in? Multiple languages on each dataset. The examples cited above are for Spanish, French, and English in particular, but I also had similar behavior with Japanese.
    • How many files/speakers? Spanish and French, about 500k files and 250~300 speakers each. English, about 1.7M files and 900 speakers.
    • Are you using lab files or TextGrid files for input? Text files.
  2. Dictionary
    • Are you using a dictionary from MFA? If so, which one? spanish_mfa, french_mfa, english_mfa
    • If it's a custom dictionary, what is the phoneset? N/A
  3. Acoustic model
    • If you're using an acoustic model, is it one download through MFA? If so, which one? spanish_mfa, french_mfa, english_mfa
    • If it's a model you've trained, what data was it trained on? N/A

Desktop (please complete the following information):

Additional context Running in a DGX A100, 128 CPUs (256 cores), 2 TB RAM.

leandro-gracia-gil commented 1 month ago

For additional context, all utterances in the examples above should have a duration between 8 and 30 seconds.

dan-ya commented 1 month ago

I have been issued the same problem with very slow export of TextGrids, when using the latest MFA version. Here is my solution for this problem (@mmcauliffe).

I am not an expert in SQL, but in my opinion, the main problem is that there are two heavy SQL queries made for each aligned file (see monreal_forced_aligner.textgrid.construct_output_tiers). It is much faster to make one query and process the results afterwards by python. NOTE 1. The main bottleneck in this case is memory consumption. I have 32 Gb of RAM, and it was fine for me to keep the alignment of ~160000 files (100 hours of speech) in memory. To work with huge datasets one might improve the implementation with getting query results in batches using the method sqlalchemy.orm.Query.yield_per(count) instead of sqlalchemy.orm.Query.all() or something like this. NOTE 2. It works for me only in a single-worker mode as all the files are processed at once (mfa align -j 1 ...). The multiple-worker mode would require preliminary processing of file lists.

It took about 1 minute to process my data instead of the estimated 50 hours (~1 file/s + several hours for multiprocessing initialization).

Changes made:

To make it more convenient for me, I added the --fast_textgrid_export flag to use this implementation. For that I changed as well:

I put my implementation below, just in case one would need it. Don't forget to import construct_all_output_tiers in monreal_forced_aligner.alignment.base, if you are going to use this implementation.

My MFA call is now: mfa align -j 1 --fast_textgrid_export INPUT_DIR english_us_mfa english_mfa OUTPUT_DIR

construct_all_output_tiers (modified construct_output_tiers)

def construct_all_output_tiers(
    session: Session,
    workflow: CorpusWorkflow,
    cleanup_textgrids: bool,
    clitic_marker: str,
    include_original_text: bool,
) -> Dict[int, Dict[str, Dict[str, List[CtmInterval]]]]:
    """
    Construct aligned output tiers for all files in a given corpus

    Parameters
    ----------
    session: Session
        SqlAlchemy session

    Returns
    -------
    Dict[int, Dict[str, Dict[str, List[CtmInterval]]]]
        Aligned tiers for all files in the given corpus
    """
    phone_intervals = (
        session.query(PhoneInterval.begin, PhoneInterval.end, Phone.phone, Speaker.name, Utterance.file_id)
        .join(PhoneInterval.phone)
        .join(PhoneInterval.utterance)
        .join(Utterance.speaker)
        .filter(PhoneInterval.workflow_id == workflow.id)
        .filter(PhoneInterval.duration > 0)
        .order_by(PhoneInterval.begin)
    )
    word_intervals = (
        session.query(WordInterval.begin, WordInterval.end, Word.word, Speaker.name, Utterance.file_id)
        .join(WordInterval.word)
        .join(WordInterval.utterance)
        .join(Utterance.speaker)
        .filter(WordInterval.workflow_id == workflow.id)
        .filter(WordInterval.duration > 0)
        .order_by(WordInterval.begin)
    )
    utterances = (
        session.query(Utterance.begin, Utterance.end, Utterance.text, Speaker.name, Utterance.file_id)
        .join(Utterance.speaker)
    )
    phone_intervals = phone_intervals.all()
    word_intervals = word_intervals.all()
    utterances = utterances.all()
    phone_data = defaultdict(list)
    for p_begin, p_end, phone, speaker_name, file_id in phone_intervals:
        if cleanup_textgrids and phone == SIL_PHONE:
            continue
        phone_data[file_id].append((p_begin, p_end, phone, speaker_name))
    word_data = defaultdict(list)
    for w_begin, w_end, w, speaker_name, file_id in word_intervals:
        if cleanup_textgrids and w == SIL_WORD:
            continue
        word_data[file_id].append((w_begin, w_end, w, speaker_name))
    utterance_data = defaultdict(list)
    for utt_begin, utt_end, utt_text, speaker_name, file_id in utterances:
        utterance_data[file_id].append((utt_begin, utt_end, utt_text, speaker_name))
    data = {}
    file_ids = sorted(word_data)
    for file_id in file_ids:
        if file_id not in data:
            data[file_id] = {}
        for w_begin, w_end, w, speaker_name in word_data[file_id]:
            if speaker_name not in data[file_id]:
                data[file_id][speaker_name] = {"words": [], "phones": []}
                if include_original_text:
                    data[file_id][speaker_name]["utterances"] = []
            if (
                cleanup_textgrids
                and data[file_id][speaker_name]["words"]
                and w_begin - data[file_id][speaker_name]["words"][-1].end < 0.02
                and clitic_marker
                and (
                    data[file_id][speaker_name]["words"][-1].label.endswith(clitic_marker)
                    or w.startswith(clitic_marker)
                )
            ):
                data[file_id][speaker_name]["words"][-1].end = w_end
                data[file_id][speaker_name]["words"][-1].label += w
            else:
                data[file_id][speaker_name]["words"].append(CtmInterval(w_begin, w_end, w))

        for p_begin, p_end, phone, speaker_name in phone_data[file_id]:
            data[file_id][speaker_name]["phones"].append(CtmInterval(p_begin, p_end, phone))

        if include_original_text:
            for utt_begin, utt_end, utt_text, speaker_name in utterance_data[file_id]:
                data[file_id][speaker_name]["utterances"].append(CtmInterval(utt_begin, utt_end, utt_text))
    return data

export_textgrids (modified)

    def export_textgrids(
        self,
        output_format: str = TextFileType.TEXTGRID.value,
        include_original_text: bool = False,
        fast_textgrid_export: bool = False,
    ) -> None:
        """
        Exports alignments to TextGrid files

        See Also
        --------
        :class:`~montreal_forced_aligner.alignment.multiprocessing.ExportTextGridProcessWorker`
            Multiprocessing helper function for TextGrid export
        :meth:`.CorpusAligner.export_textgrid_arguments`
            Job method for TextGrid export

        Parameters
        ----------
        output_format: str, optional
            Format to save alignments, one of 'long_textgrids' (the default), 'short_textgrids', or 'json', passed to praatio
        """
        workflow = self.current_workflow
        if not workflow.alignments_collected:
            self.collect_alignments()
        begin = time.time()
        error_dict = {}
        with self.session() as session:
            files = (
                session.query(
                    File.id,
                    File.name,
                    File.relative_path,
                    SoundFile.duration,
                    TextFile.text_file_path,
                )
                .join(File.sound_file)
                .join(File.text_file)
            ).all()
        if fast_textgrid_export:
            all_data = construct_all_output_tiers(
                session,
                workflow,
                config.CLEANUP_TEXTGRIDS,
                self.clitic_marker,
                include_original_text,
            )
        non_aligned_files = []
        with tqdm(total=self.num_files, disable=config.QUIET) as pbar:
            if config.USE_MP and config.NUM_JOBS > 1:
                stopped = mp.Event()

                finished_adding = mp.Event()
                for_write_queue = mp.Queue()
                return_queue = mp.Queue()
                export_procs = []
                for j in range(config.NUM_JOBS):
                    export_proc = ExportTextGridProcessWorker(
                        self.db_string,
                        for_write_queue,
                        return_queue,
                        stopped,
                        finished_adding,
                        self.export_frame_shift,
                        config.CLEANUP_TEXTGRIDS,
                        self.clitic_marker,
                        self.export_output_directory,
                        output_format,
                        include_original_text,
                    )
                    export_proc.start()
                    export_procs.append(export_proc)
                try:
                    for args in files:
                        for_write_queue.put(args)
                    time.sleep(1)
                    finished_adding.set()
                    while True:
                        try:
                            result = return_queue.get(timeout=1)
                            if isinstance(result, AlignmentExportError):
                                error_dict[getattr(result, "path", 0)] = result
                                continue
                            if self.stopped.is_set():
                                continue
                        except Empty:
                            for proc in export_procs:
                                if not proc.finished_processing.is_set():
                                    break
                            else:
                                break
                            continue
                        if isinstance(result, int):
                            pbar.update(result)
                except Exception:
                    stopped.set()
                    raise
                finally:
                    for p in export_procs:
                        p.join()
            else:
                logger.debug("Not using multiprocessing for TextGrid export")

                for file_id, name, relative_path, duration, text_file_path in files:
                    output_path = construct_output_path(
                        name,
                        relative_path,
                        self.export_output_directory,
                        text_file_path,
                        output_format,
                    )
                    if fast_textgrid_export:
                        if file_id not in all_data:
                            logger.debug(f'There is no aligment for file {name}, db_id {file_id}')
                            non_aligned_files.append(name)
                            continue
                        data = all_data[file_id]
                    else:
                        data = construct_output_tiers(
                            session,
                            file_id,
                            workflow,
                            config.CLEANUP_TEXTGRIDS,
                            self.clitic_marker,
                            include_original_text,
                        )
                    export_textgrid(
                        data,
                        output_path,
                        duration,
                        self.export_frame_shift,
                        output_format=output_format,
                    )
                    pbar.update(1)
        print(f'Textgrids are not exported: ({len(non_aligned_files)})')
        for f in sorted(non_aligned_files):
            print(f' - {f}')
        if error_dict:
            logger.warning(
                f"There were {len(error_dict)} errors encountered in generating TextGrids. "
                f"Check {os.path.join(self.export_output_directory, 'output_errors.txt')} "
                f"for more details"
            )
            output_textgrid_writing_errors(self.export_output_directory, error_dict)
            if config.DEBUG:
                for k, v in error_dict.items():
                    print(k)
                    raise v
        logger.info(f"Finished exporting TextGrids to {self.export_output_directory}!")
        logger.debug(f"Exported TextGrids in a total of {time.time() - begin:.3f} seconds")

align_corpus_cli (slightly modified)

def align_corpus_cli(context, **kwargs) -> None:
    """
    Align a corpus with a pronunciation dictionary and a pretrained acoustic model.
    """
    if kwargs.get("profile", None) is not None:
        config.CURRENT_PROFILE_NAME = kwargs.pop("profile")
    config.update_configuration(kwargs)
    config_path = kwargs.get("config_path", None)
    reference_directory = kwargs.get("reference_directory", None)
    custom_mapping_path = kwargs.get("custom_mapping_path", None)
    corpus_directory = kwargs["corpus_directory"].absolute()
    dictionary_path = kwargs["dictionary_path"]
    acoustic_model_path = kwargs["acoustic_model_path"]
    output_directory = kwargs["output_directory"]
    output_format = kwargs["output_format"]
    include_original_text = kwargs["include_original_text"]
    fast_textgrid_export = kwargs["fast_textgrid_export"]
    g2p_model_path = kwargs.get("g2p_model_path", None)
    if g2p_model_path:
        g2p_model_path = validate_g2p_model(context, kwargs, g2p_model_path)
    aligner = PretrainedAligner(
        corpus_directory=corpus_directory,
        dictionary_path=dictionary_path,
        acoustic_model_path=acoustic_model_path,
        g2p_model_path=g2p_model_path,
        **PretrainedAligner.parse_parameters(config_path, context.params, context.args),
    )
    try:
        aligner.align()
        aligner.analyze_alignments()
        if aligner.use_phone_model:
            aligner.export_files(
                output_directory,
                output_format=output_format,
                include_original_text=include_original_text,
                fast_textgrid_export=fast_textgrid_export
            )
        else:
            aligner.export_files(
                output_directory,
                output_format=output_format,
                include_original_text=include_original_text,
                fast_textgrid_export=fast_textgrid_export
            )
        if reference_directory:
            aligner.load_reference_alignments(reference_directory)
            mapping = None
            if custom_mapping_path:
                with mfa_open(custom_mapping_path, "r") as f:
                    mapping = yaml.load(f, Loader=yaml.Loader)
                aligner.validate_mapping(mapping)
            reference_alignments = WorkflowType.reference
        else:
            reference_alignments = WorkflowType.alignment

        if aligner.use_phone_model:
            aligner.evaluate_alignments(
                mapping,
                output_directory=output_directory,
                reference_source=reference_alignments,
                comparison_source=WorkflowType.phone_transcription,
            )
        else:
            if reference_alignments is WorkflowType.reference:
                aligner.evaluate_alignments(
                    mapping,
                    output_directory=output_directory,
                    reference_source=reference_alignments,
                    comparison_source=WorkflowType.alignment,
                )
    except Exception:
        aligner.dirty = True
        raise
    finally:
        aligner.cleanup()

export_files (slightly modified)

    def export_files(
        self,
        output_directory: typing.Union[Path, str],
        output_format: Optional[str] = None,
        include_original_text: bool = False,
        fast_textgrid_export: bool = False,
    ) -> None:
        """
        Export a TextGrid file for every sound file in the dataset

        Parameters
        ----------
        output_directory: :class:`~pathlib.Path`
            Directory to save to
        output_format: str, optional
            Format to save alignments, one of 'long_textgrids' (the default), 'short_textgrids', or 'json', passed to praatio
        include_original_text: bool
            Flag for including the original text of the corpus files as a tier
        workflow: :class:`~montreal_forced_aligner.data.WorkflowType`
            Workflow to use when exporting files
        """
        if isinstance(output_directory, str):
            output_directory = Path(output_directory)
        if output_format is None:
            output_format = TextFileType.TEXTGRID.value
        self.export_output_directory = output_directory

        logger.info(
            f"Exporting {self.current_workflow.name} TextGrids to {self.export_output_directory}..."
        )
        self.export_output_directory.mkdir(parents=True, exist_ok=True)
        analysis_csv = self.working_directory.joinpath("alignment_analysis.csv")
        if analysis_csv.exists():
            shutil.copyfile(
                analysis_csv, self.export_output_directory.joinpath("alignment_analysis.csv")
            )
        self.export_textgrids(output_format, include_original_text, fast_textgrid_export)
leandro-gracia-gil commented 1 month ago

@dan-ya Thank you very much for your detailed reply! It would be great to get in MFA a version of this that works with multiple workers.

I wonder if there are similar problems in other parts of the code that also seem to get sudden performance drops for me, like generating MFCCs or generating alignments. Yet these are not as slow as exporting textgrids.

mmcauliffe commented 1 month ago

Thanks for this @dan-ya ! I've added a multiprocessing version of this to 3.1.0, along with making sure it shouldn't overwhelm memory for lower speed systems (using the yield_per as you suggested and processing files in order so they can be written out and free up memory).

dan-ya commented 1 month ago

Thank you very much, @mmcauliffe, for taking care of it!