NVIDIA / NeMo

A scalable generative AI framework built for researchers and developers working on Large Language Models, Multimodal, and Speech AI (Automatic Speech Recognition and Text-to-Speech)
https://docs.nvidia.com/nemo-framework/user-guide/latest/overview.html
Apache License 2.0
11.29k stars 2.35k forks source link

MSDD inference is too slow #7101

Closed SagyHarpazGong closed 3 weeks ago

SagyHarpazGong commented 1 year ago

I run the MSDD model on Nvidia A10 (24GB), but the inference is too slow, I looked on the code and there is a lot of traffic between the CPU and GPU and vice versa.

most of the time GPU utilization is on 0%

First the data is split into short segments according to the number of scales (I have 5 scales). After each scale splitting the embedding extraction is applied and save the embedding to pkl file. Then the the clustering is applied and finally the MSDD is applied.

Is there something that can be done in order to speed up the inference? Is there any flag for parallelism the embedding extraction stage?

please help.

nithinraok commented 1 year ago

What do you mean by parallelism of embedding extraction when you are inferencing on single GPU?

tango4j commented 1 year ago

This is very recent issue we also discovered. It's not the MSDD but TitaNet embedding extractor is taking a lot of time. I will look into it and get back soon.

SagyHarpazGong commented 1 year ago

Hi @nithinraok , thanks for your response. What ai meant is that there is a for loop that run the segmentation and the embedding extraction. But as I saw the bottleneck of the execution time is the traffic between the GPU and the CPU, and of course the writing and reading files (e.g. pkl files)

SagyHarpazGong commented 1 year ago

Thanks @tango4j yes I also think so

nithinraok commented 1 year ago

You can skip writing pkl files as well, have you tried disabling saving pickle files through config?

SagyHarpazGong commented 1 year ago

Of course but the msdd use them so if I disable the saving of the pkl files I get FileNotFoundError

tango4j commented 1 year ago

This issue is happening only for MSDD diarizer, not for clustering diarizer. I suppose something related to yaml setting is causing this. Let me get back to this soon.

SagyHarpazGong commented 1 year ago

I want to use the msdd diarization

tango4j commented 1 year ago

@SagyHarpazGong Sure, let us work on this. Thanks...!

tango4j commented 1 year ago

@SagyHarpazGong I have found that it is getting slow if we use TitaNet ckpt in MSDD nemo file. A quick fix is adding the following:

 diarizer.speaker_embeddings.model_path="titanet_large" \
 diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \

in the yaml config. This could lead to a small change in terms of performance. (could be better or worse). If you want quick fix, try this. Otherwise, it will take some more time to fix the speed with the speaker model from ckpt.

SagyHarpazGong commented 1 year ago

@tango4j I checked as well and still slow, I'm really suspect that the reason for the slow inference is the utilization of the CPUs, most of the inference time the utilization of GPU is on 0%, and all I/O of file system is another reason for slow inference.

tango4j commented 1 year ago

@SagyHarpazGong If you do not see the difference between before and after you apply the configs that I suggested, then your code is likely to not reflecting the change on loading TitaNet parameters. Check your CUDA settings, and batch size for diarization inference. Make sure you are maximizing the GPU memory.

If it changes and speeds up, but the improvement is less then 30%, then please let us know.

SagyHarpazGong commented 1 year ago

What CUDA settings should I need to check? By increasing the batch_size the memory used in the GPU is almost on the maximum and still it's slow

tango4j commented 1 year ago

@SagyHarpazGong Did you experience any relative improvement of speed after you apply use_speaker_model_from_ckpt=False ?

SagyHarpazGong commented 1 year ago

@tango4j not at all

tango4j commented 1 year ago

@SagyHarpazGong I suspect that the changes in the configuration are not being reflected at all in your code base. Unfortunately, at this time, we don't have a solution for the slowdown issue on your machine with your setup.

Apart from this, I will update the NGC MSDD model checkpoint to resolve this slow down issue.

SagyHarpazGong commented 1 year ago

@tango4j thanks, I'll try to share images of the nvidia-smi during the inference in order to show you that most of the time the utilization of the GPU is on 0%

SagyHarpazGong commented 1 year ago

Hi all, I fixed the issue by inherit the classes: ClusteringDiarizer, ClusterEmbedding, NeuralDiarizer and modified them so instead of saving the embeddings in pkl files and load them for the MSDD inference , the embeddings are passing to the MSDD inference without using the file system they are in the GPU memory.

this is my implementation:

class ClustDiar(ClusteringDiarizer):
    def _extract_embeddings(self, manifest_file: str, scale_idx: int, num_scales: int):
        """
        This method extracts speaker embeddings from segments passed through manifest_file
        Optionally you may save the intermediate speaker embeddings for debugging or any use.
        """
        logging.info("Extracting embeddings for Diarization")
        self._setup_spkr_test_data(manifest_file)
        self.embeddings = {}
        self._speaker_model.eval()
        self.time_stamps = {}

        all_embs = torch.empty([0], device=self._speaker_model.device)
        for test_batch in tqdm(
            self._speaker_model.test_dataloader(),
            desc=f'[{scale_idx+1}/{num_scales}] extract embeddings',
            leave=True,
            disable=not self.verbose,
        ):
            test_batch = [x.to(self._speaker_model.device) for x in test_batch]
            audio_signal, audio_signal_len, labels, slices = test_batch
            with autocast():
                _, embs = self._speaker_model.forward(input_signal=audio_signal, input_signal_length=audio_signal_len)
                emb_shape = embs.shape[-1]
                embs = embs.view(-1, emb_shape)
                all_embs = torch.cat((all_embs, embs.detach()), dim=0)
            del test_batch

        with open(manifest_file, 'r', encoding='utf-8') as manifest:
            for i, line in enumerate(manifest.readlines()):
                line = line.strip()
                dic = json.loads(line)
                uniq_name = get_uniqname_from_filepath(dic['audio_filepath'])
                if uniq_name in self.embeddings:
                    self.embeddings[uniq_name] = torch.cat((self.embeddings[uniq_name], all_embs[i].view(1, -1)))
                else:
                    self.embeddings[uniq_name] = all_embs[i].view(1, -1)
                if uniq_name not in self.time_stamps:
                    self.time_stamps[uniq_name] = []
                start = dic['offset']
                end = start + dic['duration']
                self.time_stamps[uniq_name].append([start, end])

    def diarize(self, paths2audio_files: List[str] = None, batch_size: int = 0):
        """
        Diarize files provided thorugh paths2audio_files or manifest file
        input:
        paths2audio_files (List[str]): list of paths to file containing audio file
        batch_size (int): batch_size considered for extraction of speaker embeddings and VAD computation
        """

        self._out_dir = self._diarizer_params.out_dir

        self._speaker_dir = os.path.join(self._diarizer_params.out_dir, 'speaker_outputs')

        if os.path.exists(self._speaker_dir):
            logging.warning("Deleting previous clustering diarizer outputs.")
            shutil.rmtree(self._speaker_dir, ignore_errors=True)
        os.makedirs(self._speaker_dir)

        if not os.path.exists(self._out_dir):
            os.mkdir(self._out_dir)

        self._vad_dir = os.path.join(self._out_dir, 'vad_outputs')
        self._vad_out_file = os.path.join(self._vad_dir, "vad_out.json")

        if batch_size:
            self._cfg.batch_size = batch_size

        if paths2audio_files:
            if type(paths2audio_files) is list:
                self._diarizer_params.manifest_filepath = os.path.join(self._out_dir, 'paths2audio_filepath.json')
                self.path2audio_files_to_manifest(paths2audio_files, self._diarizer_params.manifest_filepath)
            else:
                raise ValueError("paths2audio_files must be of type list of paths to file containing audio file")

        self.AUDIO_RTTM_MAP = audio_rttm_map(self._diarizer_params.manifest_filepath)

        out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
        os.makedirs(out_rttm_dir, exist_ok=True)

        # Speech Activity Detection
        self._perform_speech_activity_detection()

        # Segmentation
        scales = self.multiscale_args_dict['scale_dict'].items()
        self.emb_scale_seq_dict = {}
        for scale_idx, (window, shift) in scales:

            # Segmentation for the current scale (scale_idx)
            self._run_segmentation(window, shift, scale_tag=f'_scale{scale_idx}')

            # Embedding Extraction for the current scale (scale_idx)
            self._extract_embeddings(self.subsegments_manifest_path, scale_idx, len(scales))

            self.emb_scale_seq_dict[scale_idx] = self.embeddings

            self.multiscale_embeddings_and_timestamps[scale_idx] = [self.embeddings, self.time_stamps]

        embs_and_timestamps = get_embs_and_timestamps(
            self.multiscale_embeddings_and_timestamps, self.multiscale_args_dict
        )

        # Clustering
        all_reference, all_hypothesis = perform_clustering(
            embs_and_timestamps=embs_and_timestamps,
            AUDIO_RTTM_MAP=self.AUDIO_RTTM_MAP,
            out_rttm_dir=out_rttm_dir,
            clustering_params=self._cluster_params,
            device=self._speaker_model.device,
            verbose=self.verbose,
        )
        logging.info("Outputs are saved in {} directory".format(os.path.abspath(self._diarizer_params.out_dir)))

        # Scoring
        return score_labels(
            self.AUDIO_RTTM_MAP,
            all_reference,
            all_hypothesis,
            collar=self._diarizer_params.collar,
            ignore_overlap=self._diarizer_params.ignore_overlap,
            verbose=self.verbose,
        )

class ClusEmb(ClusterEmbedding):
    """
    This class is built for calculating cluster-average embeddings, segmentation and load/save of the estimated cluster labels.
    The methods in this class is used for the inference of MSDD models.

    Args:
        cfg_diar_infer (DictConfig):
            Config dictionary from diarization inference YAML file
        cfg_msdd_model (DictConfig):
            Config dictionary from MSDD model checkpoint file

    Class Variables:
        self.cfg_diar_infer (DictConfig):
            Config dictionary from diarization inference YAML file
        cfg_msdd_model (DictConfig):
            Config dictionary from MSDD model checkpoint file
        self._speaker_model (class `EncDecSpeakerLabelModel`):
            This is a placeholder for class instance of `EncDecSpeakerLabelModel`
        self.scale_window_length_list (list):
            List containing the window lengths (i.e., scale length) of each scale.
        self.scale_n (int):
            Number of scales for multi-scale clustering diarizer
        self.base_scale_index (int):
            The index of the base-scale which is the shortest scale among the given multiple scales
    """
    def __init__(
        self, cfg_diar_infer: DictConfig, cfg_msdd_model: DictConfig, speaker_model: Optional[EncDecSpeakerLabelModel]
    ):
        super().__init__(cfg_diar_infer, cfg_msdd_model, speaker_model)
        self.cfg_diar_infer = cfg_diar_infer
        self._cfg_msdd = cfg_msdd_model
        self._speaker_model = speaker_model
        self.scale_window_length_list = list(
            self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.window_length_in_sec
        )
        self.scale_n = len(self.scale_window_length_list)
        self.base_scale_index = len(self.scale_window_length_list) - 1
        self.clus_diar_model = ClustDiar(cfg=self.cfg_diar_infer, speaker_model=self._speaker_model)

    def run_clustering_diarizer(self, manifest_filepath: str, emb_dir: str):
        """
        If no pre-existing data is provided, run clustering diarizer from scratch. This will create scale-wise speaker embedding
        sequence, cluster-average embeddings, scale mapping and base scale clustering labels. Note that speaker embedding `state_dict`
        is loaded from the `state_dict` in the provided MSDD checkpoint.

        Args:
            manifest_filepath (str):
                Input manifest file for creating audio-to-RTTM mapping.
            emb_dir (str):
                Output directory where embedding files and timestamp files are saved.

        Returns:
            emb_sess_avg_dict (dict):
                Dictionary containing cluster-average embeddings for each session.
            emb_scale_seq_dict (dict):
                Dictionary containing embedding tensors which are indexed by scale numbers.
            base_clus_label_dict (dict):
                Dictionary containing clustering results. Clustering results are cluster labels for the base scale segments.
        """
        self.cfg_diar_infer.diarizer.manifest_filepath = manifest_filepath
        self.cfg_diar_infer.diarizer.out_dir = emb_dir

        # Run ClusteringDiarizer which includes system VAD or oracle VAD.
        self._out_dir = self.clus_diar_model._diarizer_params.out_dir
        self.out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
        os.makedirs(self.out_rttm_dir, exist_ok=True)

        self.clus_diar_model._cluster_params = self.cfg_diar_infer.diarizer.clustering.parameters
        self.clus_diar_model.multiscale_args_dict[
            "multiscale_weights"
        ] = self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.multiscale_weights
        self.clus_diar_model._diarizer_params.speaker_embeddings.parameters = (
            self.cfg_diar_infer.diarizer.speaker_embeddings.parameters
        )
        cluster_params = self.clus_diar_model._cluster_params
        cluster_params = dict(cluster_params) if isinstance(cluster_params, DictConfig) else cluster_params.dict()
        clustering_params_str = json.dumps(cluster_params, indent=4)

        logging.info(f"Multiscale Weights: {self.clus_diar_model.multiscale_args_dict['multiscale_weights']}")
        logging.info(f"Clustering Parameters: {clustering_params_str}")
        scores = self.clus_diar_model.diarize(batch_size=self.cfg_diar_infer.batch_size)

        # If RTTM (ground-truth diarization annotation) files do not exist, scores is None.
        if scores is not None:
            metric, speaker_mapping_dict, _ = scores
        else:
            metric, speaker_mapping_dict = None, None

        # Get the mapping between segments in different scales.
        self._embs_and_timestamps = get_embs_and_timestamps(
            self.clus_diar_model.multiscale_embeddings_and_timestamps, self.clus_diar_model.multiscale_args_dict
        )
        session_scale_mapping_dict = self.get_scale_map(self._embs_and_timestamps)
        clus_labels = self.load_clustering_labels(emb_dir)
        emb_sess_avg_dict, base_clus_label_dict = self.get_cluster_avg_embs(
            self.clus_diar_model.emb_scale_seq_dict, clus_labels, speaker_mapping_dict, session_scale_mapping_dict
        )
        self.clus_diar_model.emb_scale_seq_dict['session_scale_mapping'] = session_scale_mapping_dict
        return emb_sess_avg_dict, self.clus_diar_model.emb_scale_seq_dict, base_clus_label_dict, metric

class NeuralDiar(NeuralDiarizer):
    def __init__(self, cfg: Union[DictConfig, NeuralDiarizerInferenceConfig]):
        super().__init__(cfg)
        self._cfg = cfg
        self._speaker_model = None
        self.msdd_model = None

        # Parameter settings for MSDD model
        self.use_speaker_model_from_ckpt = cfg.diarizer.msdd_model.parameters.get('use_speaker_model_from_ckpt', True)
        self.use_clus_as_main = cfg.diarizer.msdd_model.parameters.get('use_clus_as_main', False)
        self.max_overlap_spks = cfg.diarizer.msdd_model.parameters.get('max_overlap_spks', 2)
        self.num_spks_per_model = cfg.diarizer.msdd_model.parameters.get('num_spks_per_model', 2)
        self.use_adaptive_thres = cfg.diarizer.msdd_model.parameters.get('use_adaptive_thres', True)
        self.max_pred_length = cfg.diarizer.msdd_model.parameters.get('max_pred_length', 0)
        self.diar_eval_settings = cfg.diarizer.msdd_model.parameters.get(
            'diar_eval_settings', [(0.25, True), (0.25, False), (0.0, False)]
        )

        self._init_msdd_model(cfg)
        self.diar_window_length = cfg.diarizer.msdd_model.parameters.diar_window_length
        self.transfer_diar_params_to_model_params(self.msdd_model, cfg)

        if self.msdd_model is None:
            raise TypeError(f'The MSDD model is None')
        # Initialize clustering and embedding preparation instance (as a diarization encoder).
        self.clustering_embedding = ClusEmb(
            cfg_diar_infer=cfg, cfg_msdd_model=self.msdd_model.cfg, speaker_model=self._speaker_model
        )

        # Parameters for creating diarization results from MSDD outputs.
        self.clustering_max_spks = self.msdd_model.cfg.max_num_of_spks
        self.overlap_infer_spk_limit = cfg.diarizer.msdd_model.parameters.get(
            'overlap_infer_spk_limit', self.clustering_max_spks
        )
github-actions[bot] commented 11 months ago

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

github-actions[bot] commented 11 months ago

This issue was closed because it has been inactive for 7 days since being marked as stale.

prkumar112451 commented 3 months ago

Looks like not many people use MSDD. It is 2024 mid and Nemo inference is super slow for MSDD and still no action is taken on this

nithinraok commented 3 months ago

@SagyHarpazGong did your implementation help reduce the CPU-GPU bottleneck and improve performance speed?

nithinraok commented 3 months ago

@prkumar112451 thanks for your comments, unfortunately we might have missed this or busy with other works, thank you for bringing this issue again.

nithinraok commented 3 months ago

@prkumar112451 have you tried msdd? is this the same experience you encountered? what is the error have you seen? Please elaborate for us to solve the issue.

prkumar112451 commented 3 months ago

@nithinraok Thanks so much for your response. Actually I am exploring AI models for diarization. Found Nemo and it has a decent accuracy but the problem is.. it is quite slow.

I have tried playing with almost every configuration available for us but still a 20 minutes call recording takes 1 minute to diarize. Also, on a P100 or T4 server, while processing 1 call the CPU spikes to 100% throughout the processing and GPU usage stays below 50% consistently.

I was hoping to recording the processing time of a 20 minute call to under 30 seconds (by half) but nothing worked.

The major problem with a 1 minute diarization time is.. how can we make this scalable. we would need lots of servers if we want to serve thousands of concurrent requests otherwise the delay will just keep on increasing.

My questions are : 1) a 20 mins call takes 1 minute to transcribe. Is this delay expected? 2) with such a huge delay, what is the best way to scale without using any NVIDIA paid server

nithinraok commented 3 months ago

I just ran to see the issue @SagyHarpazGong facing regarding GPU utilzation, however I notice that its currently using 70% of GPU as shown in below picture while using <20GB of GPU memory, so we couldn;t see the issue you are mentioning locally, and this is the configuration I am using:

MANIFEST_FILE=callhome_109.json'
python ./neural_diarizer/multiscale_diar_decoder_infer.py \
        --config-path='./../conf/inference' --config-name='diar_infer_telephonic.yaml' \
    diarizer.manifest_filepath=$MANIFEST_FILE \
    diarizer.out_dir='/data/diarization_ch109/' \
    diarizer.speaker_embeddings.model_path=$MODEL_PATH \
    diarizer.clustering.parameters.oracle_num_speakers=True \
    diarizer.oracle_vad=True \
    diarizer.ignore_overlap=False \
    diarizer.vad.model_path=null \
    diarizer.asr.model_path=null \
    diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \
    diarizer.msdd_model.model_path=/data/diar_msdd_telephonic.nemo

image

nithinraok commented 3 months ago

@prkumar112451 Thanks for detailed comments. Currently the way we improved accuracy of NeMo diarization is by using embeddings at multi-scales which I believe would be the issue for your 20min audio. There are ways to improve this.

First to answer them I would need some clarifications from your end,

prkumar112451 commented 3 months ago

@nithinraok Thanks for quick response.

the 20 min audio is a call center telephony conversation between a customer and an agent.

I am using a combination of Whisper for transcription and then Nemo for diarization. Taking this repo as reference -

https://github.com/piegu/language-models/blob/master/speech_to_text_transcription_with_speakers_Whisper_Transcription_%2B_NeMo_Diarization.ipynb?source=post_page-----8da2312f1617--------------------------------

we can see that lots of whisper optimization techniques are there like flash-attention, batching etc. And have been able to speed up whisper alot.

But the diarization part is acting as bottleneck. Just to be very sure, I completely removed the whisper part and ran a plain nemo's telephony based ai-model iar_msdd_telephonic but it's speed is 1 minute diarization time for 20 min call recording.

To answer your questions : 1) any architecture that solves the use-case and reduce delay with okayish accuracy so that we can scale is good enough 2) could you share the configuration name which we need to update to reduce the number of scales 3) This is the configuration setting which we are using : https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/diar_infer_telephonic.yaml

nithinraok commented 3 months ago

Regarding the performace bottleneck of diarization, if you could tolerate some performance in accuracy, I would suggest you to try the clustering diarizer with single scale without msdd model, as shown in below config here:

MANIFEST_FILE='callhome_109.json'
python examples/speaker_tasks/diarization/clustering_diarizer/offline_diar_infer.py \
        --config-path='examples/speaker_tasks/diarization/conf/inference' --config-name='diar_infer_telephonic.yaml' \
    diarizer.manifest_filepath=$MANIFEST_FILE \
    diarizer.out_dir='/data/sample/' \
    diarizer.speaker_embeddings.model_path=${MODEL_PATH} \
    diarizer.speaker_embeddings.parameters.window_length_in_sec=1.5 \
    diarizer.speaker_embeddings.parameters.shift_length_in_sec=0.75 \
    diarizer.vad.model_path='vad_multilingual_marblenet' \
    diarizer.asr.model_path=null \
    diarizer.msdd_model.model_path=null \
    diarizer.oracle_vad=False \
    diarizer.clustering.parameters.oracle_num_speakers=False \
    batch_size=256 \
    num_workers=1

This setting would be fast, you may note that we could switch from external VAD to ASR VAD as well, so you could do ASR+SD in one go. We explained some of these settings here, pls feel free to explore: https://github.com/NVIDIA/NeMo/tree/main/examples/speaker_tasks/diarization#run-speech-recognition-with-speaker-diarization. Very important to note that common setting might not be best for all kind of audio samples due to various backgrounds and noise level so use it accordingly. Above configuration does only clustering based diarization with single scale embeddings using VAD output from marblenet vad.

Also, It is very exciting to see your use case and ofcourse we have blazing light ASR models that can do inference with punctuations and capitalizations, could you give this model a try: https://huggingface.co/nvidia/parakeet-tdt_ctc-1.1b ?

I am looking to put together a space with above model and speaker diarization soon will keep it posted here.

nithinraok commented 3 months ago

We are working on improving RTF for ASR models even more, you can only expect models to get better in terms of both speed and accuracy.

prkumar112451 commented 3 months ago

@nithinraok

To run python examples/speaker_tasks/diarization/clustering_diarizer/offline_diar_infer.py on Kaggle notebook,

installed these libraries (mentioned in PIP installation section of NEMO Github https://github.com/NVIDIA/NeMo/ ) : !apt-get update && apt-get install -y libsndfile1 ffmpeg !pip install Cython !pip install nemo_toolkit['all']

and then did the import that is on the top of offline_diar_infer.py file (https://github.com/NVIDIA/NeMo/blob/main/nemo/collections/asr/models/clustering_diarizer.py ):

from omegaconf import OmegaConf from pytorch_lightning import seed_everything

from nemo.collections.asr.models import ClusteringDiarizer from nemo.core.config import hydra_runner from nemo.utils import logging

Getting this error :

File /opt/conda/lib/python3.10/site-packages/datasets/filesystems/s3filesystem.py:1 ----> 1 import s3fs 3 from ..utils.deprecation_utils import deprecated 6 @deprecated("Use s3fs.S3FileSystem instead.") 7 class S3FileSystem(s3fs.S3FileSystem):

File /opt/conda/lib/python3.10/site-packages/s3fs/init.py:1 ----> 1 from .core import S3FileSystem, S3File 2 from .mapping import S3Map 4 from ._version import get_versions

File /opt/conda/lib/python3.10/site-packages/s3fs/core.py:29 27 import aiobotocore 28 import botocore ---> 29 import aiobotocore.session 30 from aiobotocore.config import AioConfig 31 from botocore.exceptions import ClientError, HTTPClientError, ParamValidationError

File /opt/conda/lib/python3.10/site-packages/aiobotocore/session.py:10 7 from botocore.session import UnknownServiceError, copy 9 from . import version, retryhandler ---> 10 from .client import AioBaseClient, AioClientCreator 11 from .configprovider import AioSmartDefaultsConfigStoreFactory 12 from .credentials import AioCredentials, create_credential_resolver

File /opt/conda/lib/python3.10/site-packages/aiobotocore/client.py:10 1 from botocore.awsrequest import prepare_request_dict 2 from botocore.client import ( 3 BaseClient, 4 ClientCreator, (...) 8 resolve_checksum_context, 9 ) ---> 10 from botocore.compress import maybe_compress_request 11 from botocore.discovery import block_endpoint_discovery_required_operations 12 from botocore.exceptions import OperationNotPageableError, UnknownServiceError

ModuleNotFoundError: No module named 'botocore.compress'

image

Is there any restriction on which python version we need to use, I am using 3.10.13

nithinraok commented 3 months ago

It looks like s3filesystem is local to the notebook you are running, NeMo doesn;t depend on this package or its derivates. Would it be possible to share the notebook? We have similar notebook for diarization at https://github.com/NVIDIA/NeMo/blob/main/tutorials/speaker_tasks/Speaker_Diarization_Inference.ipynb for you to get started.

SagyHarpazGong commented 3 months ago

@nithinraok Hi.

My implementation speed up the embeddings extraction phase but still the clustering phase and the final phase (msdd) are extremely slow because there are a lot of traffic between the CPU and the GPU, also there are heavy CPU computations like "get_argmin_mat" function in offline_clustering.py

nithinraok commented 3 months ago

Your comments are highly appreciated, we will soon take these in to consideration and update our codebase.

SagyHarpazGong commented 3 months ago

@nithinraok About the function "get_argmin_mat" function in offline_clustering.py, when there are a lot of embedding vectors the CPU RAM usage is growing until "Killed" message, for example if we use machine with only 4 CPU cores (total RAM = 16GB) that function could cause Killed error because it tries to allocate more memory than available. Because the tile functions that basically duplicating Tensors. My temporal solution is batching the tiling.

This is the original function:

def get_argmin_mat(timestamps_in_scales: List[torch.Tensor]) -> List[torch.Tensor]:
    """
    Calculate the mapping between the base scale and other scales. A segment from a longer scale is
    repeatedly mapped to a segment from a shorter scale or the base scale.

    Args:
        timestamps_in_scales (list):
            List containing timestamp tensors for each scale.
            Each tensor has dimensions of (Number of base segments) x 2.

    Returns:
        session_scale_mapping_list (list):
            List containing argmin arrays indexed by scale index.
    """
    scale_list = list(range(len(timestamps_in_scales)))
    segment_anchor_list = []
    for scale_idx in scale_list:
        time_stamps_float = timestamps_in_scales[scale_idx]
        segment_anchor_list.append(torch.mean(time_stamps_float, dim=1))

    base_scale_idx = max(scale_list)
    base_scale_anchor = segment_anchor_list[base_scale_idx]
    session_scale_mapping_list = []
    for scale_idx in scale_list:
        curr_scale_anchor = segment_anchor_list[scale_idx]
        curr_mat = torch.tile(curr_scale_anchor, (base_scale_anchor.shape[0], 1))
        base_mat = torch.tile(base_scale_anchor, (curr_scale_anchor.shape[0], 1)).t()
        argmin_mat = torch.argmin(torch.abs(curr_mat - base_mat), dim=1)
        session_scale_mapping_list.append(argmin_mat)
    return session_scale_mapping_list

and this is my re-implementation:

def batching_argmin(base_scale_anchor: torch.Tensor, curr_scale_anchor: torch.Tensor, batch_size: int):
    """
    Calculate the element-wise argmin between two sets of anchor points, processing data in batches
    to reduce memory usage.

    Args:
        base_scale_anchor (torch.Tensor):
            The anchor points of the base scale, represented as a tensor.

        curr_scale_anchor (torch.Tensor):
            The anchor points of the current scale, represented as a tensor.

        batch_size (int):
            The batch size for processing the data.

    Returns:
        torch.Tensor:
            A tensor containing the element-wise argmin results between the base and current scale anchor points.
    """

    num_base_segments = base_scale_anchor.shape[0]
    temp_scale_mapping = []
    # Process data in batches to reduce memory usage
    for i in range(0, num_base_segments, batch_size):
        base_batch = base_scale_anchor[i:i + batch_size]
        argmin_mat = torch.argmin(torch.abs(curr_scale_anchor - base_batch.unsqueeze(1)), dim=1)
        temp_scale_mapping.append(argmin_mat)

    # Concatenate the results for all batches
    return torch.cat(temp_scale_mapping)

def get_argmin_mat(timestamps_in_scales: List[torch.Tensor], batch_size: int = 2048) -> List[torch.Tensor]:
    """
    Calculate the mapping between the base scale and other scales. This function computes
    the mapping of segments from a longer scale to segments from a shorter scale or the base scale.

    Args:
        timestamps_in_scales (list):
            A list of timestamp tensors for each scale. Each tensor has dimensions
            (Number of base segments) x 2.

        batch_size (int, optional):
            The batch size for computing the mapping. Defaults to 2048.

    Returns:
        session_scale_mapping_list (list):
            A list containing argmin arrays indexed by scale index.
    """
    scale_list = list(range(len(timestamps_in_scales)))
    segment_anchor_list = []

    for scale_idx in scale_list:
        time_stamps_float = timestamps_in_scales[scale_idx]
        segment_anchor_list.append(torch.mean(time_stamps_float, dim=1))

    base_scale_idx = max(scale_list)
    base_scale_anchor = segment_anchor_list[base_scale_idx]
    num_base_segments = base_scale_anchor.shape[0]

    session_scale_mapping_list = []

    for scale_idx in scale_list:
        curr_scale_anchor = segment_anchor_list[scale_idx]
        num_curr_segments = curr_scale_anchor.shape[0]

        session_scale_mapping_list.append(batching_argmin(base_scale_anchor, curr_scale_anchor, batch_size))

    return session_scale_mapping_list
prkumar112451 commented 3 months ago

@nithinraok - tried the clustering diarization but the accuracy is too poor compared to neural diarization. Also the speed of clustering diarization of Nemo is roughly similar to the neural diarization of pyannote with better accuracy. It took a 6 min audio file 16 seconds to diarize.

I was planning to use Nemo for production instance but looking at its speed, finding it not very reliable to be used for production. Especially the factor that it is not able to use the GPUs really well and depends alot more on CPUs.

Could you share if there is any reason for Nemo team not taking feedback from @SagyHarpazGong and implementing the fixes he gave almost one year back in this comment https://github.com/NVIDIA/NeMo/issues/7101#issuecomment-1659165640

I really liked Nemo, its Neural diarization accuracy was found to be better than pyaanote. But its speed is too low for it to be scalable enough

@SagyHarpazGong - Could you share if the fixes you have added, are you using Nemo on production instance? And will it be possible for you to check-in these changes in the clone of Nemo that you have in your repository. I tried cloning your repo but it didn't have the changes you shared in this thread to speedup Nemo

SagyHarpazGong commented 3 months ago

@prkumar112451 My fixes are implemented only in my work repo and not in NeMo repo, but I shared exactly what I changed/added. I really really don't have the time to work on PR for NeMo team.

Regarding to your question about using NeMo on production instance, the answer is yes NeMo (with my fixes) is running in production instance for at least half a year. The improvement in the total diarization process is depend on the number of embedding vectors (the number of speech segments) most of our recordings are between 40-60 minutes and the improvement is approximately x2.5-x3 faster. But in recordings that are longer than 60 minutes the improvement is approximately x2.

So as you already mentioned the bottleneck is the traffic between CPU and GPU and vice versa, but also the memory usage both in GPU and CPU.

BTW, I found out that a lot of the inference functions are under torch.no_grad() method, meaning each tensor in the process is x2 larger in size (tensor data and tensor gradients), so in my repo I just add decorator @torch.no_grad() at the top of the inference functions.

nithinraok commented 3 months ago

We are working on next version of Speaker Diarization, which doesn't depend on current clustering or MSDD, hence probably developers who worked on MSDD hasn;t given much attention however these are very valid points to add to the code base. We love these suggestions and will apply to improve.

nithinraok commented 3 months ago

Note: We worked on improving the speed of the current clustering diarizer for RIVA with support for TensorRT, which are not part of NeMo, however those improvements can only be used when using RIVA.

nithinraok commented 3 months ago

SD with RIVA: https://docs.nvidia.com/deeplearning/riva/user-guide/docs/tutorials/asr-speaker-diarization.html

maxpain commented 2 months ago

We are working on next version of Speaker Diarization, which doesn't depend on current clustering or MSDD, hence probably developers who worked on MSDD hasn;t given much attention however these are very valid points to add to the code base. We love these suggestions and will apply to improve.

I'm also trying to add a NeMo diarizer to my pipeline and can't wait for the update. In my case, I extract embeddings using titanet for each speaker in the audio file (about 1-5 minutes each).

maxpain commented 2 months ago

The main bottleneck is the CPU. GPUs are barely used during the diarization process.

Screenshot 2024-05-24 at 20 09 25 Screenshot 2024-05-24 at 20 09 32
prkumar112451 commented 2 months ago

@nithinraok - RIVA is too expensive, can't go with that. You shared about TensorRT. Are you suggesting that Nemo will be faster with TensorRT?

https://github.com/NVIDIA/TensorRT

If we run Nemo within TensorRT container in a T4 GPU, will it speed up nemo to at least 2x to 3x times?

prkumar112451 commented 2 months ago

@nithinraok - Also, you shared about working on new version of speaker diarization which is not related to clustering or MSDD. Could you share if there is any rough date on which we can expect that.

After looking at all the options, found that pyannote uses GPUs in a much better way. That would mean, GPUs with better cuda cores would means quicker response. Had to go with pyannote and currently working on finetuning of pyannote to improve the quality of the output.

But must say, it was disappointing to find that Nemo, which felt really good at first, had so much CPU dependency and such a poor usage of GPU. Looks like for a production environment with good number of recordings to transcribe, this just wouldn't work and currently pyannote is only good enough option..

prkumar112451 commented 1 month ago

Any updates on this please?

tango4j commented 1 month ago

@prkumar112451 I am sorry that you are feeling that way. Unfortunately, we do not have plans on improving the speed gain of the pipelined speaker diarization in NeMo. We will directly move on to the end-to-end version of speaker diarization. In terms of the new end-to-end model release, I think it would be not until the end of September.

github-actions[bot] commented 4 weeks ago

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

github-actions[bot] commented 3 weeks ago

This issue was closed because it has been inactive for 7 days since being marked as stale.