aws / amazon-kinesis-video-streams-parser-library

Amazon Kinesis Video Streams parser library is for developers to include in their applications that makes it easy to work with the output of video streams such as retrieving frame-level objects, metadata for fragments, and more.
Apache License 2.0
103 stars 52 forks source link

Python OpenCV consumer #87

Open ekcheng opened 4 years ago

ekcheng commented 4 years ago

Hello, I am creating video analytics algorithms using Python and OpenCV and would like to run these algorithms on a live KVS stream using the GetMedia API.

What is the recommended way to access the frame level data in Python? Do we have to essentially rewrite in Python the stream parsing capability written in Java here? Or is there a recommended way to leverage the existing library here and perhaps pipe the data over to Python somehow?

Thanks.

Sean-Der commented 4 years ago

Hey @ekcheng thanks for checking out KVS!

Since you are working with OpenCV you can probably leverage others libraries for MKV parsing. You just need to call GetMedia to pull down the clusters, and you can feed them into anything you want.

I haven't solved this myself, but it seems like Python has a pretty rich ecosystem for ebml/mkv handling! Then after that you can pass the H264 directly into OpenH264. If you have any specific questions happy to look at code/hack up little samples.

thanks

ekcheng commented 4 years ago

Thanks so much for the response, @Sean-Der.

Sounds like the approach is:

  1. Pull down a stream of bytes using GetMedia.
  2. Use a library to parse that byte stream into discrete mkv chunks.
  3. Use a library to parse each mkv container to pull out the individual frame-level data.

Correct?

bhavikapanara commented 3 years ago

Hi @ekcheng

How you download the AWS Kinesis video stream using Python? I am looking for a solution. Can you please help me to figure it out.

Thanks, Bhavika

vgonisanz commented 3 years ago

How you download the AWS Kinesis video stream using Python? I am looking for a solution.

Same problem here, the approach of @ekcheng looks good but it is not easy to parse discrete mkv chunks with Python. Any suggest to do this task in Python? I also tried to use Kinesis Client Library Consumer but I cannot get 'kinesisvideo' stream with 'kinesis' client to use the shards.

lherman-cs commented 3 years ago

@bhavikapanara @vgonisanz as of now we don't have a consumer library written in Python (we don't have any plan of adding yet), the link that you're referring to is for Kinesis Data Stream not Kinesis Video Stream. Since we're using RESTful APIs, you can simply wrap our RESTful APIs in Python by following this reference, https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_GetMedia.html.

vgonisanz commented 3 years ago

you can simply wrap our RESTful APIs in Python by following this reference, https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_GetMedia.html.

Yep, I confirm that it is possible to get the data parsing the payload and obtain mkv chunks (with an AAC audio track). The problem for me is how process the raw audio in Python. I'm able to read the file metadata with FFMPEG: ffmpeg -i test_03s.mkv gives:

[matroska,webm @ 0x5562cfd30820] Could not find codec parameters for stream 0 (Audio: aac (LC), stereo, fltp): unspecified sample rate
Consider increasing the value for the 'analyzeduration' and 'probesize' options
Input #0, matroska,webm, from 'test_03s.mkv':
  Metadata:
    title           : Kinesis Video SDK
    encoder         : Kinesis Video SDK 1.0.0 JNI 2.0
    ContactId       : 71c04a0e-279d-4706-b705-e20b3f74c8fb
    InstanceId      : f1f87e8c-ae31-4f00-86dd-ea630fcd9fe5
    MimeType        : audio/L16;rate=8000;channels=1;
    AUDIO_FROM_CUSTOMER: 1
    AWS_KINESISVIDEO_FRAGMENT_NUMBER: 91343852333181447247962534056705360245849090861
    AWS_KINESISVIDEO_SERVER_TIMESTAMP: 1602742268.250
    AWS_KINESISVIDEO_PRODUCER_TIMESTAMP: 1602742268.052
  Duration: N/A, start: 1602742268.052000, bitrate: N/A
    Stream #0:0(eng): Audio: aac (LC), stereo, fltp (default)
    Metadata:
      title           : AUDIO_FROM_CUSTOMER
At least one output file must be specified

but I have problems to consumer the content in Python.

lherman-cs commented 3 years ago

@vgonisanz after parsing the MKV payload, you would need to have H264/AAC decoder to decode it (AFAIK gstreamer binding in Python is the best candidate). Now, you just need to parse this raw audio to whatever data structure that you want.

In your case, the raw audio is formatted in Mono Little Endian 16 bits integer. So, it basically looks like below:

|  sample 1 (16 bits)  |  sample 2 (16 bits) | ... |

In Python, you'll need to convert from a byte stream to 16 bits integer array, and swap the bit order if you're on a big endian machine.

gai6948 commented 3 years ago

I wrote some application to parse the MKV frame with python (without audio in my case), the approach is naive and it crashes occasionally, but otherwise the latency is acceptable (2-3s from producer)

See if anyone has better solutions, thanks

import cv2
import imageio
import boto3
import pika

def transcode_frame(frame):  
    # Encode frame into string for job submission
    img_str = cv2.imencode('.jpg', frame)[1].tostring()
    print("One frame transocded")
    return img_str

def get_frame(chunk):
    try:
        fragment = imageio.get_reader(io.BytesIO(chunk), 'ffmpeg')
        for num , im in enumerate(fragment):
            print(num)
            if num % 30 == 0:
                sts = 0
                print("Frame captured")
                break
        print("Returning result")
        return im, sts
        # print(f'Finish one chunk took: {timeit.default_timer() - start_time}')
    except OSError as e:
        print("Broken fragment received")
        sts = 1
        return None, sts

def read_chunk(fragment):
    chunk = fragment['Payload'].read(1024*8*8)
    print("Chunk read")
    return chunk

def get_fragment():
    # start_time = timeit.default_timer()
    fragment = media_client.get_media(
    StreamARN=os.environ['KVS_ARN'],
    StartSelector={
        'StartSelectorType': 'NOW'
    }
    )
    # print(f'Downloading one chunk took: {timeit.default_timer() - start_time}')
    print('Fragment downloaded')
    return fragment

def run():
    while True:
        start_time = timeit.default_timer()
        fragment = get_fragment()
        chunk = read_chunk(fragment)
        im, sts = get_frame(chunk)
        if sts != 0:
            time.sleep(0.25)
            continue
        img_str = transcode_frame(im)
        try:
            put_queue(img_str)
            time.sleep(0.25)
        except:
            print('Cannot connect to RabbitMQ')
            time.sleep(3)
            continue
        print(f'Time between fragment download and frame processing: {timeit.default_timer() - start_time}')

if __name__ == '__main__':
    kvs_client = boto3.client('kinesisvideo')
    kvs_endpoint = kvs_client.get_data_endpoint(
        StreamARN=os.environ['KVS_ARN'],
        APIName='GET_MEDIA'
    )
    endpoint_url = kvs_endpoint['DataEndpoint']
    # Get video fragment from KVS
    media_client = boto3.client('kinesis-video-media', endpoint_url=endpoint_url)
    run()
bml1g12 commented 3 years ago

imageio.get_reader

This looks like a great workaround to parse the stream in Python. Could I ask do you know how one would extract the metadata in the MKV header like AWS_KINESISVIDEO_PRODUCER_TIMESTAMP ?

I would also be grateful if you could explain if this bit

        for num , im in enumerate(fragment):
            print(num)
            if num % 30 == 0:
                sts = 0
                print("Frame captured")
                break

Is just to downsample to 1 frame in every 30 frames, or something more technical?

gai6948 commented 3 years ago

imageio.get_reader

This looks like a great workaround to parse the stream in Python. Could I ask do you know how one would extract the metadata in the MKV header like AWS_KINESISVIDEO_PRODUCER_TIMESTAMP ?

I would also be grateful if you could explain if this bit

        for num , im in enumerate(fragment):
            print(num)
            if num % 30 == 0:
                sts = 0
                print("Frame captured")
                break

Is just to downsample to 1 frame in every 30 frames, or something more technical?

For the MKV metadata, that's the part I struggled with that Python implementation, so my final solution is to use this KVS Parser Library in Java to parse the frame along with the MKV metadata, pack them in a blob and send to Kinesis for my downstream Python consumer (i used ImageIO as the interface between Java's BufferedImage and Python's OpenCV)

for the enumerate part, it isn't related to fps, just because every fragment contains variable number of frame (depends on how many bytes you specified in the read() method), and most of the time I am only interested in 1 of these frames no matter how many there are.

bml1g12 commented 3 years ago

imageio.get_reader

This looks like a great workaround to parse the stream in Python. Could I ask do you know how one would extract the metadata in the MKV header like AWS_KINESISVIDEO_PRODUCER_TIMESTAMP ? I would also be grateful if you could explain if this bit

        for num , im in enumerate(fragment):
            print(num)
            if num % 30 == 0:
                sts = 0
                print("Frame captured")
                break

Is just to downsample to 1 frame in every 30 frames, or something more technical?

For the MKV metadata, that's the part I struggled with that Python implementation, so my final solution is to use this KVS Parser Library in Java to parse the frame along with the MKV metadata, pack them in a blob and send to Kinesis for my downstream Python consumer (i used ImageIO as the interface between Java's BufferedImage and Python's OpenCV)

for the enumerate part, it isn't related to fps, just because every fragment contains variable number of frame (depends on how many bytes you specified in the read() method), and most of the time I am only interested in 1 of these frames no matter how many there are.

I see, so the % 30 is to select an arbitrary (30th) frame within the fragment.

It seems with ffmpeg/ffprobe I can extract some of the metadata; but haven't found a way to do this dynamically in python as the stream comes in.

I have found that using the Python boto3 GetClip API allows me to make a Python script that gets video clips and with ListFragments API I can get the associated timestamps; so that might be good enough for what I need. Indeed the Java parser library seems the only way to easily get frame-by-frame metadata, which is a pain as I am not a confident Java developer.

gai6948 commented 3 years ago

imageio.get_reader

This looks like a great workaround to parse the stream in Python. Could I ask do you know how one would extract the metadata in the MKV header like AWS_KINESISVIDEO_PRODUCER_TIMESTAMP ? I would also be grateful if you could explain if this bit

        for num , im in enumerate(fragment):
            print(num)
            if num % 30 == 0:
                sts = 0
                print("Frame captured")
                break

Is just to downsample to 1 frame in every 30 frames, or something more technical?

For the MKV metadata, that's the part I struggled with that Python implementation, so my final solution is to use this KVS Parser Library in Java to parse the frame along with the MKV metadata, pack them in a blob and send to Kinesis for my downstream Python consumer (i used ImageIO as the interface between Java's BufferedImage and Python's OpenCV) for the enumerate part, it isn't related to fps, just because every fragment contains variable number of frame (depends on how many bytes you specified in the read() method), and most of the time I am only interested in 1 of these frames no matter how many there are.

I see, so the % 30 is to select an arbitrary (30th) frame within the fragment.

It seems with ffmpeg/ffprobe I can extract some of the metadata; but haven't found a way to do this dynamically in python as the stream comes in.

I have found that using the Python boto3 GetClip API allows me to make a Python script that gets video clips and with ListFragments API I can get the associated timestamps; so that might be good enough for what I need. Indeed the Java parser library seems the only way to easily get frame-by-frame metadata, which is a pain as I am not a confident Java developer.

I am not a Java developer either, but the parser library is easy to use, so with just < 200 lines of Java I can send the image frame and metadata to a kinesis stream for my Python processors. Sample here.

The GetClip API has quite some latency, I needed the 2-4s latency with GetMedia, so KVS Parser Library is the right choice, although JCodec is quite sluggish at some time.

vgonisanz commented 3 years ago

I finally start using the Kinesis video consumer based on Java on a aws lambda to parse the audio, and send the results as wav file to another aws lambda instance using Python. This is not the best solution. Anyway, my last effort to try to parse the audio is in this code snippet:

import os
import uuid
import wave
import time

import boto3
from botocore.exceptions import ClientError

import structlog

logger = structlog.get_logger()

class Storage():
    def __init__(self, bucket_name):
        self._s3 = boto3.client('s3')
        self._kinesis_client = boto3.client('kinesis')
        self._kinesisvideo_client = boto3.client('kinesisvideo')
        #self._kinesisvideo_client = boto3.client('kinesis-video-archived-media')
        self._bucket_name = bucket_name

    def download_user_conversation(self, filename):
        """
        download_user_conversation: Download a wav file from S3 bucket.

        Returns
        -------
        result: True if success.
        tmpfile: The name of the file downloaded.
        """
        tmpfile = os.path.join('/tmp', str(uuid.uuid4()) + '.wav')

        logger.debug("download_user_conversation", tmpfile=tmpfile, bucket=self._bucket_name, filename=filename)

        with open(tmpfile, 'wb') as f:
            try:
                self._s3.download_fileobj(self._bucket_name, filename, f)
                return True, tmpfile
            except ClientError as e:
                logger.error(e)
                return False, tmpfile

    def download_stream(self, streamARM, ms_start_timestamp):
        tmpfile = os.path.join('/tmp', str(uuid.uuid4()) + '.mkv')

        logger.debug("get_data_endpoint", tmpfile=tmpfile, streamARM=streamARM, ms_start_timestamp=ms_start_timestamp)
        response = self._kinesisvideo_client.get_data_endpoint(
           StreamARN=streamARM,
           APIName='GET_MEDIA')

        data_endpoint = response['DataEndpoint']
        logger.debug("get_data_endpoint", endpoint_url=data_endpoint)
        video_client = boto3.client('kinesis-video-media', endpoint_url=data_endpoint)
        stream = video_client.get_media(
           StreamARN=streamARM,
           StartSelector={'StartSelectorType': 'EARLIEST'})

        time.sleep(1)   # Remove ASAP
        logger.debug("stream", stream=stream)
        streamingBody = stream['Payload']

        # This write a mkv chunk, seems corrupted
        with open(tmpfile, 'w+b') as f:
            f.write(streamingBody.read())

        logger.debug("recorded_file", tmpfile=tmpfile)
        return True, tmpfile

    def upload_file(self, file_name, object_name=None):
        """Upload a file to an S3 bucket

        :param file_name: File to upload
        :param bucket: Bucket to upload to
        :param object_name: S3 object name. If not specified then file_name is used
        :return: True if file was uploaded, else False
        """
        # If S3 object_name was not specified, use file_name
        if object_name is None:
            object_name = file_name

        # Upload the file
        try:
            logger.info("upload_file", file_name=file_name, object_name=object_name, bucket=self._bucket_name)
            response = self._s3.upload_file(file_name, self._bucket_name, object_name)
        except ClientError as e:
            logger.error("upload_file_error", message=e)
            return False
        return True

    def test(self, StreamARN):
        response = self._kinesisvideo_client.describe_stream(StreamARN=StreamARN)
        StreamName = response['StreamInfo']['StreamName']
        StreamARN2 = response['StreamInfo']['StreamARN']
        MediaType = response['StreamInfo']['MediaType']
        logger.info("test", StreamARN=StreamARN, StreamName=StreamName, MediaType=MediaType, response=response)
        #response = self._kinesis_client.describe_stream(StreamName=StreamName)
        #logger.info("test2", StreamName=StreamName, response=response)

Maybe this helps someone to parse the file. To call the code just run into your integration:

bucket_name = "Your S3 storage id"
storage = Storage(bucket_name)
StreamARN = "Your stream ID"
storage.download_stream(StreamARN)

Disclaimer, the code may have errors.

I usually to store the results in a S3 bucket. I used the boto3/kinesis-video reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis-video-media.html#KinesisVideoMedia.Client.get_media

imran1995-python commented 3 years ago

you can simply wrap our RESTful APIs in Python by following this reference, https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_GetMedia.html.

Yep, I confirm that it is possible to get the data parsing the payload and obtain mkv chunks (with an AAC audio track). The problem for me is how process the raw audio in Python. I'm able to read the file metadata with FFMPEG: ffmpeg -i test_03s.mkv gives:

[matroska,webm @ 0x5562cfd30820] Could not find codec parameters for stream 0 (Audio: aac (LC), stereo, fltp): unspecified sample rate
Consider increasing the value for the 'analyzeduration' and 'probesize' options
Input #0, matroska,webm, from 'test_03s.mkv':
  Metadata:
    title           : Kinesis Video SDK
    encoder         : Kinesis Video SDK 1.0.0 JNI 2.0
    ContactId       : 71c04a0e-279d-4706-b705-e20b3f74c8fb
    InstanceId      : f1f87e8c-ae31-4f00-86dd-ea630fcd9fe5
    MimeType        : audio/L16;rate=8000;channels=1;
    AUDIO_FROM_CUSTOMER: 1
    AWS_KINESISVIDEO_FRAGMENT_NUMBER: 91343852333181447247962534056705360245849090861
    AWS_KINESISVIDEO_SERVER_TIMESTAMP: 1602742268.250
    AWS_KINESISVIDEO_PRODUCER_TIMESTAMP: 1602742268.052
  Duration: N/A, start: 1602742268.052000, bitrate: N/A
    Stream #0:0(eng): Audio: aac (LC), stereo, fltp (default)
    Metadata:
      title           : AUDIO_FROM_CUSTOMER
At least one output file must be specified

but I have problems to consumer the content in Python.

Can you tell me which library did you use for parsing mkv tags?

manzanofab commented 1 year ago

I finally start using the Kinesis video consumer based on Java on a aws lambda to parse the audio, and send the results as wav file to another aws lambda instance using Python. This is not the best solution. Anyway, my last effort to try to parse the audio is in this code snippet:

@vgonisanz will be ok to ask for directions in your lambda? I am giving up on getting it on python. Is there a template (cloudformation) that I can use to install this JAVA/lambda?

dcolcott commented 1 year ago

Hi All, This is a common request to consume and parse Amazon Kinesis Video Streams to individual frames and MKV fragments.

You can do so with this Kinesis Video Python Consumer library I recently released as an AWS code sample: https://github.com/aws-samples/amazon-kinesis-video-streams-consumer-library-for-python