nonvegan / streamlink-plugin-kick

Streamlink plugin for Kick.com
BSD 2-Clause "Simplified" License
43 stars 4 forks source link

Low latency support #13

Open Fortesqu opened 8 months ago

Fortesqu commented 8 months ago

I copy pasted from twitch plugin and it works. Add --kick-low-latency and --hls-live-edge 1 arguments.

"""
$description Kick, a gaming livestreaming platform
$url kick.com
$type live, vod
$metadata id
$metadata author
$metadata category
$metadata title
$notes :ref:`Low latency streaming <cli/plugins/kick:Low latency streaming>` is supported.
"""

import re
import cloudscraper
import argparse
import base64
import logging
import sys
from contextlib import suppress
from dataclasses import dataclass, replace as dataclass_replace
from datetime import datetime, timedelta
from json import dumps as json_dumps
from random import random
from typing import ClassVar, Mapping, Optional, Tuple, Type
from urllib.parse import urlparse

from requests.exceptions import HTTPError

from streamlink.exceptions import NoStreamsError, PluginError
from streamlink.plugin import Plugin, pluginargument, pluginmatcher
from streamlink.plugin.api import validate
from streamlink.session import Streamlink
from streamlink.stream.hls import (
    M3U8,
    DateRange,
    HLSPlaylist,
    HLSSegment,
    HLSStream,
    HLSStreamReader,
    HLSStreamWorker,
    HLSStreamWriter,
    M3U8Parser,
    parse_tag,
)
from streamlink.stream.http import HTTPStream
from streamlink.utils.parse import parse_json, parse_qsd
from streamlink.utils.random import CHOICES_ALPHA_NUM, random_token
from streamlink.utils.times import fromtimestamp, hours_minutes_seconds_float
from streamlink.utils.url import update_qsd

log = logging.getLogger(__name__)

LOW_LATENCY_MAX_LIVE_EDGE = 2

@dataclass
class kickHLSSegment(HLSSegment):
    prefetch: bool

class kickM3U8(M3U8[kickHLSSegment, HLSPlaylist]):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.dateranges_ads = []

class kickM3U8Parser(M3U8Parser[kickM3U8, kickHLSSegment, HLSPlaylist]):
    __m3u8__: ClassVar[Type[kickM3U8]] = kickM3U8
    __segment__: ClassVar[Type[kickHLSSegment]] = kickHLSSegment

    @parse_tag("EXT-X-PREFETCH")
    def parse_tag_ext_x_kick_prefetch(self, value):
        segments = self.m3u8.segments
        if not segments:  # pragma: no cover
            return
        last = segments[-1]
        # Use the average duration of all regular segments for the duration of prefetch segments.
        # This is better than using the duration of the last segment when regular segment durations vary a lot.
        # In low latency mode, the playlist reload time is the duration of the last segment.
        duration = last.duration if last.prefetch else sum(segment.duration for segment in segments) / float(len(segments))
        # Use the last duration for extrapolating the start time of the prefetch segment, which is needed for checking
        # whether it is an ad segment and matches the parsed date ranges or not
        date = last.date + timedelta(seconds=last.duration)
        # Always treat prefetch segments after a discontinuity as ad segments
        segment = dataclass_replace(
            last,
            uri=self.uri(value),
            duration=duration,
            title=None,
            date=date,
            prefetch=True,
        )
        segments.append(segment)

    @parse_tag("EXT-X-DATERANGE")
    def parse_tag_ext_x_daterange(self, value):
        super().parse_tag_ext_x_daterange(value)
        daterange = self.m3u8.dateranges[-1]
    def get_segment(self, uri: str, **data) -> kickHLSSegment:
        segment: kickHLSSegment = super().get_segment(uri, prefetch=False)  # type: ignore[assignment]

        return segment

class kickHLSStreamWorker(HLSStreamWorker):
    reader: "kickHLSStreamReader"
    writer: "kickHLSStreamWriter"
    stream: "kickHLSStream"

    def __init__(self, reader, *args, **kwargs):
        self.had_content = False
        super().__init__(reader, *args, **kwargs)

    def _playlist_reload_time(self, playlist: kickM3U8):  # type: ignore[override]
        if self.stream.low_latency and playlist.segments:
            return playlist.segments[-1].duration

        return super()._playlist_reload_time(playlist)

    def process_segments(self, playlist: kickM3U8):  # type: ignore[override]
        # ignore prefetch segments if not LL streaming
        if not self.stream.low_latency:
            playlist.segments = [segment for segment in playlist.segments if not segment.prefetch]

        return super().process_segments(playlist)

class kickHLSStreamWriter(HLSStreamWriter):
    reader: "kickHLSStreamReader"
    stream: "kickHLSStream"

class kickHLSStreamReader(HLSStreamReader):
    __worker__ = kickHLSStreamWorker
    __writer__ = kickHLSStreamWriter

    worker: "kickHLSStreamWorker"
    writer: "kickHLSStreamWriter"
    stream: "kickHLSStream"

    def __init__(self, stream: "kickHLSStream"):
        if stream.low_latency:
            live_edge = max(1, min(LOW_LATENCY_MAX_LIVE_EDGE, stream.session.options.get("hls-live-edge")))
            stream.session.options.set("hls-live-edge", live_edge)
            stream.session.options.set("hls-segment-stream-data", True)
            log.info(f"Low latency streaming (HLS live edge: {live_edge})")
        super().__init__(stream)

class kickHLSStream(HLSStream):
    __reader__ = kickHLSStreamReader
    __parser__ = kickM3U8Parser

    def __init__(self, *args, low_latency: bool = False, **kwargs):
        super().__init__(*args, **kwargs)
        self.low_latency = low_latency

log = logging.getLogger(__name__)

@pluginmatcher(
    re.compile(
        # https://github.com/yt-dlp/yt-dlp/blob/9b7a48abd1b187eae1e3f6c9839c47d43ccec00b/yt_dlp/extractor/kick.py#LL33-L33C111
        r"https?://(?:www\.)?kick\.com/(?!(?:video|categories|search|auth)(?:[/?#]|$))(?P<channel>[\w_-]+)$",
    ),
    name="live",
)
@pluginmatcher(
    re.compile(
        # https://github.com/yt-dlp/yt-dlp/blob/2d5cae9636714ff922d28c548c349d5f2b48f317/yt_dlp/extractor/kick.py#LL84C18-L84C104
        r"https?://(?:www\.)?kick\.com/video/(?P<video_id>[\da-f]{8}-(?:[\da-f]{4}-){3}[\da-f]{12})",
    ),
    name="vod",
)
@pluginmatcher(
    re.compile(
        r"https?://(?:www\.)?kick\.com/(?!(?:video|categories|search|auth)(?:[/?#]|$))(?P<channel>[\w_-]+)\?clip=(?P<clip_id>[\w_]+)$",
    ),
    name="clip",
)
@pluginargument(
    "low-latency",
    action="store_true",
    help="""
        Enables low latency streaming by prefetching HLS segments.
        Sets --hls-segment-stream-data to true and --hls-live-edge to 2, if it is higher.
        Reducing --hls-live-edge to `1` will result in the lowest latency possible, but will most likely cause buffering.

        In order to achieve true low latency streaming during playback, the player's caching/buffering settings will
        need to be adjusted and reduced to a value as low as possible, but still high enough to not cause any buffering.
        This depends on the stream's bitrate and the quality of the connection to kick's servers. Please refer to the
        player's own documentation for the required configuration. Player parameters can be set via --player-args.

        Note: Low latency streams have to be enabled by the broadcasters on kick themselves.
        Regular streams can cause buffering issues with this option enabled due to the reduced --hls-live-edge value.
    """,
)
class KICK(Plugin):
    def _get_streams(self):
        API_BASE_URL = "https://kick.com/api"

        _LIVE_SCHEMA = validate.Schema(
            validate.parse_json(),
            {
                "playback_url": validate.url(path=validate.endswith(".m3u8")),
                "livestream": {
                    "is_live": True,
                    "id": int,
                    "session_title": str,
                    "categories": [{"name": str}],
                },
                "user": {"username": str},
            },
            validate.union_get(
                "playback_url",
                ("livestream", "id"),
                ("user", "username"),
                ("livestream", "session_title"),
                ("livestream", "categories", 0, "name"),
            ),
        )

        _VIDEO_SCHEMA = validate.Schema(
            validate.parse_json(),
            {
                "source": validate.url(path=validate.endswith(".m3u8")),
                "id": int,
                "livestream": {
                    "channel": {"user": {"username": str}},
                    "session_title": str,
                    "categories": [{"name": str}],
                },
            },
            validate.union_get(
                "source",
                "id",
                ("livestream", "channel", "user", "username"),
                ("livestream", "session_title"),
                ("livestream", "categories", 0, "name"),
            ),
        )

        _CLIP_SCHEMA = validate.Schema(
            validate.parse_json(),
            {
                "clip": {
                    "video_url": validate.url(path=validate.endswith(".m3u8")),
                    "id": str,
                    "channel": {"username": str},
                    "title": str,
                    "category": {"name": str},
                },
            },
            validate.union_get(
                ("clip", "video_url"),
                ("clip", "id"),
                ("clip", "channel", "username"),
                ("clip", "title"),
                ("clip", "category", "name"),
            ),
        )

        live, vod, clip = (
            self.matches["live"],
            self.matches["vod"],
            self.matches["clip"],
        )

        try:
            scraper = cloudscraper.create_scraper()
            res = scraper.get(
                "{0}/{1}/{2}".format(
                    API_BASE_URL,
                    *(
                        ["v2/channels", self.match["channel"]]
                        if live
                        else (
                            ["v2/video", self.match["video_id"]]
                            if vod
                            else ["v2/clips", self.match["clip_id"]]
                        )
                    )
                )
            )

            url, self.id, self.author, self.title, self.category = (
                _LIVE_SCHEMA if live else (_VIDEO_SCHEMA if vod else _CLIP_SCHEMA)
            ).validate(res.text)

        except (PluginError, TypeError) as err:
            log.debug(err)
            return

        finally:
            scraper.close()

        if live or vod:
            yield from kickHLSStream.parse_variant_playlist(self.session, url, low_latency=self.get_option("low-latency")).items()
        elif (
            clip and self.author.casefold() == self.match["channel"].casefold()
        ):  # Sanity check if the clip channel is the same as the one in the URL
            yield "source", HLSStream(self.session, url)

__plugin__ = KICK
Jertzukka commented 7 months ago

Thanks for your work, based on my testing this works and seems to reduce the stream delay by ~10 seconds. You could consider making this as a pull request as it seems to be an improvement.

ozilwg commented 4 months ago

This code doesn't work. Can anyone update to make the low latency option work?

Jertzukka commented 4 months ago

This code doesn't work. Can anyone update to make the low latency option work?

I use this daily and it does work.