rany2 / edge-tts

Use Microsoft Edge's online text-to-speech service from Python WITHOUT needing Microsoft Edge or Windows or an API key
https://pypi.org/project/edge-tts/
GNU General Public License v3.0
4.24k stars 447 forks source link

generate chinese subtitles function update #156

Closed wh1te-moon closed 1 month ago

wh1te-moon commented 7 months ago

It's makes many mistakes when generating chinese subtitle,like this

WEBVTT

00:00:00.083 --> 00:00:02.583

你 穿越 大明成 为 第一 贪官 入股 赌坊 兴办 青楼

00:00:02.708 --> 00:00:04.892

是 沛县 最大 的 保护伞 你 更 是 当众 受贿

00:00:05.125 --> 00:00:07.858

万两白银 打点 官职 就 连 沈安 的 县衙 前院 你

00:00:07.858 --> 00:00:09.833

都 毫无 避讳 地 摆 满 了 金尊 琉璃 可

Incorrect clauses and missing punctuation. I fix it in my fork,which creates a new function named generate_cn_subs. Should I make a pr request?

Anning01 commented 2 months ago

I used your code, please don't request pr, your code will cause the subtitle and voice speed mismatch problem

wh1te-moon commented 2 months ago

What does "the subtitle and voice speed mismatch problem" mean? Could you give me an example? I update a new file in my fork named streaming_with_cn_subtitles.py. It tests the rate argument without problems.

But this cn_subtitle function is based on the punctuation of the text, which may be not beautiful.

rany2 commented 2 months ago

Unfortunately I don't speak Chinese so I will need some help with testing. The simplest solution I can think of is to match the input text against the subtitle word boundary on a best effort basis and with some fuzziness. However, any solution will need to be generic and work for all languages not just Chinese.

rany2 commented 2 months ago

I'm thinking about doing something like set(list(output_wordboundaries)) and then filtering input on that set. This will allow me to figure out which index of the input I need to be on for the subtitle. I think it should work generically.

Imfdj commented 2 months ago

@wh1te-moon I’m curious about the effect of the repaired Chinese subtitles. Can you upload a comparison of the before and after effects?

wh1te-moon commented 2 months ago

I'm thinking about doing something like set(list(output_wordboundaries)) and then filtering input on that set. This will allow me to figure out which index of the input I need to be on for the subtitle. I think it should work generically.

In fact,my solution is based on the correct punctuation of the input text,not the specific language,so it will take effect if the PUNCTUATION_LIST is set. I have only set it for Chinese and English punctuations. This PUNCTUATION_LIST is easy to modify if we can invite someone who wants their language to be surported.This solution may only be unsupported by a small number of right-to-left languages (e.g., Arabic).

But it's based on the correct punctuation of the input text.

wh1te-moon commented 2 months ago

@wh1te-moon I’m curious about the effect of the repaired Chinese subtitles. Can you upload a comparison of the before and after effects?

input text:

"东风夜放花千树,更吹落、星如雨。宝马雕车香满路。"

vtt file:

00:00:00.083 --> 00:00:01.567

东风夜放花千树 更 吹

00:00:01.567 --> 00:00:03.742

落 星如雨 宝马雕车香满路

fixed version:

00:00:00.083 --> 00:00:01.092

东风夜放花千树,

00:00:01.267 --> 00:00:02.400

更吹落、星如雨。

00:00:02.767 --> 00:00:03.742

宝马雕车香满路。
anartigone commented 1 month ago

I'm thinking about doing something like set(list(output_wordboundaries)) and then filtering input on that set. This will allow me to figure out which index of the input I need to be on for the subtitle. I think it should work generically.

In fact,my solution is based on the correct punctuation of the input text,not the specific language,so it will take effect if the PUNCTUATION_LIST is set. I have only set it for Chinese and English punctuations. This PUNCTUATION_LIST is easy to modify if we can invite someone who wants their language to be surported.This solution may only be unsupported by a small number of right-to-left languages (e.g., Arabic).

But it's based on the correct punctuation of the input text.

I would like to give this a try. Please share how to apply your PUNCTUATION_LIST solution (e.g. files and code to be modified).

wh1te-moon commented 1 month ago

It's simple.

def generate_subs_based_on_punc(self, text) -> str:
        PUNCTUATION = [',', '。', '!', '?', ';',
                       ':', '\n', '“', '”', ',', '!', '\\. ']

        def clause(self) -> list[str]:
            pattern = '(' + '|'.join(punc for punc in PUNCTUATION) + ')'
            text_list = re.split(pattern, text)

            index = 0
            pattern = '^[' + ''.join(p for p in PUNCTUATION) + ']+$'
            while (index < len(text_list)-1):
                if not text_list[index+1]:
                    text_list.pop(index+1)
                    continue
                if re.match(pattern, text_list[index+1]):
                    if (text_list[index+1] == '\n'):
                        text_list.pop(index+1)
                        continue
                    text_list[index] += text_list.pop(index+1)
                else:
                    index += 1

            return text_list

        self.text_list = clause(self)
        if len(self.subs) != len(self.offset):
            raise ValueError("subs and offset are not of the same length")
        data = "WEBVTT\r\n\r\n"
        j = 0
        for text in self.text_list:
            try:
                start_time = self.offset[j][0]
            except IndexError:
                return data
            try:
                while (self.subs[j + 1] in text):
                    j += 1
            except IndexError:
                pass
            data += formatter(start_time, self.offset[j][1], text)
            j += 1
        return data

The main function is the clause, using re to split the text with punctuation marks.

anartigone commented 1 month ago

It's simple.

def generate_subs_based_on_punc(self, text) -> str:
        PUNCTUATION = [',', '。', '!', '?', ';',
                       ':', '\n', '“', '”', ',', '!', '\\. ']

        def clause(self) -> list[str]:
            pattern = '(' + '|'.join(punc for punc in PUNCTUATION) + ')'
            text_list = re.split(pattern, text)

            index = 0
            pattern = '^[' + ''.join(p for p in PUNCTUATION) + ']+$'
            while (index < len(text_list)-1):
                if not text_list[index+1]:
                    text_list.pop(index+1)
                    continue
                if re.match(pattern, text_list[index+1]):
                    if (text_list[index+1] == '\n'):
                        text_list.pop(index+1)
                        continue
                    text_list[index] += text_list.pop(index+1)
                else:
                    index += 1

            return text_list

        self.text_list = clause(self)
        if len(self.subs) != len(self.offset):
            raise ValueError("subs and offset are not of the same length")
        data = "WEBVTT\r\n\r\n"
        j = 0
        for text in self.text_list:
            try:
                start_time = self.offset[j][0]
            except IndexError:
                return data
            try:
                while (self.subs[j + 1] in text):
                    j += 1
            except IndexError:
                pass
            data += formatter(start_time, self.offset[j][1], text)
            j += 1
        return data

The main function is the clause, using re to split the text with punctuation marks.

Awesome, after some struggle, the code works as expected!

Just for my personal record. I would like to share what I did.

  1. Get the code: git clone https://github.com/rany2/edge-tts.git
  2. Edit /edge-tts/src/edge_tts/submaker.py :
    
    """
    SubMaker package for the Edge TTS project.

SubMaker is a package that makes the process of creating subtitles with information provided by the service easier. """

import math import re from typing import List, Tuple from xml.sax.saxutils import escape, unescape

def formatter(start_time: float, end_time: float, subdata: str) -> str: """ formatter returns the timecode and the text of the subtitle. """ return ( f"{mktimestamp(start_time)} --> {mktimestamp(end_time)}\r\n" f"{escape(subdata)}\r\n\r\n" )

def mktimestamp(time_unit: float) -> str: """ mktimestamp returns the timecode of the subtitle.

The timecode is in the format of 00:00:00.000.

Returns:
    str: The timecode of the subtitle.
"""
hour = math.floor(time_unit / 10**7 / 3600)
minute = math.floor((time_unit / 10**7 / 60) % 60)
seconds = (time_unit / 10**7) % 60
return f"{hour:02d}:{minute:02d}:{seconds:06.3f}"

class SubMaker: """ SubMaker class """

def __init__(self) -> None:
    """
    SubMaker constructor.
    """
    self.offset: List[Tuple[float, float]] = []
    self.subs: List[str] = []

def create_sub(self, timestamp: Tuple[float, float], text: str) -> None:
    """
    create_sub creates a subtitle with the given timestamp and text
    and adds it to the list of subtitles

    Args:
        timestamp (tuple): The offset and duration of the subtitle.
        text (str): The text of the subtitle.

    Returns:
        None
    """
    self.offset.append((timestamp[0], timestamp[0] + timestamp[1]))
    self.subs.append(text)

def generate_subs_based_on_punc(self, text) -> str:
    PUNCTUATION = [',', '。', '!', '?', ';',
                   ':', '\n', '“', '”', ',', '!', '\\. ']

    def clause(self) -> list[str]:
        pattern = '(' + '|'.join(punc for punc in PUNCTUATION) + ')'
        text_list = re.split(pattern, text)

        index = 0
        pattern = '^[' + ''.join(p for p in PUNCTUATION) + ']+$'
        while (index < len(text_list)-1):
            if not text_list[index+1]:
                text_list.pop(index+1)
                continue
            if re.match(pattern, text_list[index+1]):
                if (text_list[index+1] == '\n'):
                    text_list.pop(index+1)
                    continue
                text_list[index] += text_list.pop(index+1)
            else:
                index += 1

        return text_list

    self.text_list = clause(self)
    if len(self.subs) != len(self.offset):
        raise ValueError("subs and offset are not of the same length")
    data = "WEBVTT\r\n\r\n"
    j = 0
    for text in self.text_list:
        try:
            start_time = self.offset[j][0]
        except IndexError:
            return data
        try:
            while (self.subs[j + 1] in text):
                j += 1
        except IndexError:
            pass
        data += formatter(start_time, self.offset[j][1], text)
        j += 1
    return data
3. Edit `/edge-tts/src/edge_tts/util.py` :

""" Main package. """

import argparse import asyncio import sys from io import TextIOWrapper from typing import Any, TextIO, Union

from edge_tts import Communicate, SubMaker, list_voices

async def _print_voices(*, proxy: str) -> None: """Print all available voices.""" voices = await list_voices(proxy=proxy) voices = sorted(voices, key=lambda voice: voice["ShortName"]) for idx, voice in enumerate(voices): if idx != 0: print()

    for key in voice.keys():
        if key in (
            "SuggestedCodec",
            "FriendlyName",
            "Status",
            "VoiceTag",
            "Name",
            "Locale",
        ):
            continue
        pretty_key_name = key if key != "ShortName" else "Name"
        print(f"{pretty_key_name}: {voice[key]}")

async def _run_tts(args: Any) -> None: """Run TTS after parsing arguments from command line."""

try:
    if sys.stdin.isatty() and sys.stdout.isatty() and not args.write_media:
        print(
            "Warning: TTS output will be written to the terminal. "
            "Use --write-media to write to a file.\n"
            "Press Ctrl+C to cancel the operation. "
            "Press Enter to continue.",
            file=sys.stderr,
        )
        input()
except KeyboardInterrupt:
    print("\nOperation canceled.", file=sys.stderr)
    return

tts: Communicate = Communicate(
    args.text,
    args.voice,
    proxy=args.proxy,
    rate=args.rate,
    volume=args.volume,
    pitch=args.pitch,
)
subs: SubMaker = SubMaker()
with (
    open(args.write_media, "wb") if args.write_media else sys.stdout.buffer
) as audio_file:
    async for chunk in tts.stream():
        if chunk["type"] == "audio":
            audio_file.write(chunk["data"])
        elif chunk["type"] == "WordBoundary":
            subs.create_sub((chunk["offset"], chunk["duration"]), chunk["text"])

sub_file: Union[TextIOWrapper, TextIO] = (
    open(args.write_subtitles, "w", encoding="utf-8")
    if args.write_subtitles
    else sys.stderr
)
with sub_file:
    sub_file.write(subs.generate_subs_based_on_punc(args.text))

async def amain() -> None: """Async main function""" parser = argparse.ArgumentParser(description="Microsoft Edge TTS") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-t", "--text", help="what TTS will say") group.add_argument("-f", "--file", help="same as --text but read from file") parser.add_argument( "-v", "--voice", help="voice for TTS. Default: en-US-AriaNeural", default="en-US-AriaNeural", ) group.add_argument( "-l", "--list-voices", help="lists available voices and exits", action="store_true", ) parser.add_argument("--rate", help="set TTS rate. Default +0%%.", default="+0%") parser.add_argument("--volume", help="set TTS volume. Default +0%%.", default="+0%") parser.add_argument("--pitch", help="set TTS pitch. Default +0Hz.", default="+0Hz") parser.add_argument( "--words-in-cue", help="number of words in a subtitle cue. Default: 10.", default=10, type=float, ) parser.add_argument( "--write-media", help="send media output to file instead of stdout" ) parser.add_argument( "--write-subtitles", help="send subtitle output to provided file instead of stderr", ) parser.add_argument("--proxy", help="use a proxy for TTS and voice list.") args = parser.parse_args()

if args.list_voices:
    await _print_voices(proxy=args.proxy)
    sys.exit(0)

if args.file is not None:
    # we need to use sys.stdin.read() because some devices
    # like Windows and Termux don't have a /dev/stdin.
    if args.file == "/dev/stdin":
        args.text = sys.stdin.read()
    else:
        with open(args.file, "r", encoding="utf-8") as file:
            args.text = file.read()

if args.text is not None:
    await _run_tts(args)

def main() -> None: """Run the main function using asyncio.""" asyncio.run(amain())

if name == "main": main()


4. Save both file and install the package with `pip install -e /path/to/edge-tts/`
5. Use `edge-tts` command with `--write-subtitles`, the results is fixed.

Thank you again, @wh1te-moon, for your awesome help.
wh1te-moon commented 1 month ago

Why not choose to merge my pull request? Should I modify the target branch to a non-master branch? Or is there something else I should be aware of? This is my first successful involvement in an open-source project,so thank you very much too.

anartigone commented 1 month ago

This is a helpful function to have. I have tested it works in both Chinese and English. I agree it should be merged for a good reason.