tsduck / tsduck

MPEG Transport Stream Toolkit
https://tsduck.io
BSD 2-Clause "Simplified" License
814 stars 205 forks source link

[Q] External processing examples in Python (and Java) #730

Open lars18th opened 3 years ago

lars18th commented 3 years ago

Hi @lelegard ,

I'm not sure if you feel this useful or not, but I prefer to expose to you this idea. The background is our discussion done some time ago at #552 .

What I request is one example in Python of what I call external processing with TSDuck. Let me to explain what this means: Using some opensource TS tools (for example a SAT>IP server) you're reading/processing/writing a chunk of TS packets. These tools can be implemented with any language. But it will be easy to include/implement a filter processing function that can inspect every packet. And when a relevant packet is received (for example, for an specific pid), the function can "copy" this packet, process it, and write the result. That's the concept of a filter processing. However, in some cases this filter processing can't be done synchronous, as if the processing is blocking then the TS can be stalled.

So, my idea is to provide and interface to connect such tools (writed with any language) to interact with an scripting language. For example Python, but at some point it will be LUA or a similar language. How to integrate this external language with the tool is out-of-scope of the idea. So the focus is how to achieve the processing using the current Python binding of TSDuck.

The type of processing that I want to use is the Transient Asynchronous Communication model: For each TS packet, the sender (the script) passes a copy (in this scenario it's necessary a copy of the packet) to the TSDuck library, and it writes a NULL packet. Then after some time the TSDuck library generates the corresponding output, and this packet will be delivered in a buffer. So, when the sender copies another packet to the TSDuck library, then it can get a new packet to overwrite the current instead of overwriting with a NULL. This processing will be useful for any processing involving PSI/SI tables, that's the main target of the TSDuck toolkit.

At this point it's possible you can think why not implement it using the C++ API of the TSDuck. And yes, you're right that this is the best in terms of efficiency and reability. However, in some cases what we need is a test or a simple way to achieve this. For example, think on these simple tasks: change service values, rewrite EIT data, reconfigure the NIT tables, etc. All these tasks can be done wihtout the need to processing the full Transport Stream. You only need to filter some pids, and process them. So in fact you don't need a very efficient processing, and using simple script bindings you can achieve the objective.

What you think about this? It will be possible to have one Python example of such functionality? Thank you in advance!

lelegard commented 3 years ago

If I understand correctly, you would like some new plugin to extract a few PID's, send the corresponding TS packets to some external processing (forked process or Python class) and later merge the output of this external processing into the stream. The whole stuff would be asynchronous, allowing some "time shift" between the input and output. Is that right?

For example, think on these simple tasks: change service values, rewrite EIT data, reconfigure the NIT tables, etc.

If you process a stream of TS packets, these are "simple tasks" indeed when written in C++, using all demux, deserialization, serialization and packetization classes from the TSDuck library. What does make you think this would be simple in Python or in some external application? On the contrary, writing a TSDuck plugin in C++ would be much easier.

Processing the "full" transport stream in a plugin when you are interested in only one PID is not a difficulty. Just ignore other PID's which is as simple as starting the packet processing method with if (pkt.getPID() != my_PID) return TSP_OK;.

lars18th commented 3 years ago

Hi @lelegard ,

I feel that I've not explained well the objective and the use case. So, let me another try:

So, what I need to implement this strategy is a Python example that can work in this way: not processing the full TS but only some specific pids (like a TS with pid filtering enabled), and with a packet orientation processing and not a stream oriented processing. The difference is that the second assumes a continous stream, so it process it in chunks; but the first is working (more or less) as packet by packet steps. For sure, it's not viable to produce a 1-to-1 packet processing mode (TSDuck can't generate any data if it doesn't have sufficient information). But, this processing can't wait until 1000 packets are readed to obtain a result. Futhermore the original tool is not linked in any sense with the TSDuck library. So only a process of tsp is started in the script and used it to process what it receives from the tool.

Now do you understand the concept? It's more clear now? What you think? This has sense or it's completly stupid? I really appreciate your opinion. So please comment even if you think it's foolish.

lelegard commented 3 years ago

OK, so the initial tool from which you extract TS packets is not TSDuck. I did not get that.

As far as TSDuck is concerned, the idea would be to process just a few packets, maybe containing tables, process the tables, and repacketize the modified tables. Is that right?

In that case, a script is sufficient (a Python script if you like). Analyze the input packets and produce XML or JSON tables (either using tstables or a Python instance of class TSProcessor using the tables plugin). Once you modified your XML or JSON, the command tspacketize produces the TS packets. Keep in mind that if you want to cycle the output, the continuity counters shall be adjusted on the fly.

lars18th commented 3 years ago

Hi @lelegard ,

As far as TSDuck is concerned, the idea would be to process just a few packets, maybe containing tables, process the tables, and repacketize the modified tables. Is that right?

Yes, but not time limited.

In that case, a script is sufficient (a Python script if you like). Analyze the input packets and produce XML or JSON tables (either using tstables or a Python instance of class TSProcessor using the tables plugin). Once you modified your XML or JSON, the command tspacketize produces the TS packets. Keep in mind that if you want to cycle the output, the continuity counters shall be adjusted on the fly.

The packets to process can be anything. However, mainly the PSI/SI tables. And the processing can be done by the TSDuck processor directly (i.e. a simple renaming of services and network) or injecting new data (i.e. adding some EIT data) for XML/JSON files (in-place or from disk). Now I feel that's clear.

However, how to implement this it's not clear for me. You suggest that create an script that executes a loop that gets the relevant TS packets and generate a binary representation. But this is a chunk model approach. This is valid for extracting some data, for example if you want to parse the services inside the TS. But my objective is to process continously the stream. And my vision, perhaps very limited or obtused, is to use a packet model. The concept is: I instantiate a tsduck process from the script using the binding, then I'll loop (forever) writing TS packets to the input as they arrive from the primary tool, and the TSDuck will produce the outcomings putting them in a buffer that I send (forever) to the primary tool. Obviosly, this will true until the primary tool closes the stream.

You think this is possible? You can provide an example, please?

lelegard commented 3 years ago

Sorry, but this becomes even more obscure to me. tsp is based on a packet model by design.

Perhaps you should go through the exercise of writing a precise technical specification of what you want to do.

lars18th commented 3 years ago

Hi @lelegard ,

I'll try to explain in another way:

It's now the architecture more clear?

The objective is:

  1. Not link the parent tool with TSDuck or any other new process or library.
  2. Do the interprocess communication (the sharing of the PSI/SI packets) asynchronous.
  3. The other side of the broker is a Python script using the TSDuck binding.

I hope you can see the advantage of this model, as it provides a very fast (and dirty) environment for test and develop. What you think now?

lelegard commented 3 years ago

OK. I understand the need. But I still do not understand what you expect to be added in TSDuck. Everything seems to be there already. It is your responsibility to add the new filter_func() in the parent tool, the Python script is yours, so the broker between the two must be yours too. Then, what do you need more from TSDuck?

This is why I suggested to write a technical specification of what you expect from TSDuck (not from your application).

lars18th commented 3 years ago

Hi @lelegard ,

Thank you for your response. I really appreciate you for considering making a discussion of some crazy ideas from users.

My concern is not about the communication between filter_func() and the Python script. This is out-of-scope of the discussion (even my current idea is to use ZeroMQ). My request is how to implement the Python script using the TSDuck binding. At time, some obscure issues are present:

I'm sure these questions may be ridiculous to you, so for this reason I ask you to please share an example of such a script. The example script can be a simple as to run in this example:

tsp -I dvb --channel TEST \
    -P filter -p 0-31 -p 100 | \
script.py | \
tstables

Please, note that here the -P filter doesn't include the regular --stuffing option. So only the PSI/SI tables are passed (plus the PMT with pid 100). And here the relevant part is not the tsp and tstables processes, as they're just examples impersonating the parent program and the broker. The objective is that the script.py process can do some interesting processing using the TSDuck Python binding. For example changing the name of the channel with an internal svrename pluging. But... without passing a large chunk of packets to the internal process and receiving more-or-less the same number of writen packets in a sort time interval.

I hope you want to help me. Thank you.

lelegard commented 3 years ago

For example changing the name of the channel with an internal svrename pluging. But... without passing a large chunk of packets to the internal process and receiving more-or-less the same number of writen packets in a sort time interval.

This is the point where I am lost. If you want to rename a service, not only you need to modify a few tables but you also need to continue cycling the updated tables in updated TS packets, as long as the stream is running across the complete system. You cannot just send a few packets and get the same number of packets on output and stop. You need to cycle them. And you need to handle potential future updates of the input signalization to apply your modification in the updated tables (this is what svrename does for instance). So, you need to run tsp continuously, not just over a few packets.

Additionally, you should segment problems, not mix them. First, try to find the right tsp command without bothering about Python. And afterwards, when your command is ready, just transfer it as a Python TSProcessor session.

lars18th commented 3 years ago

Hi @lelegard ,

I'm sure I don't have expresed well. The idea is to do a real continous processing. The parent program, throught the filter_func() is processing the full transport stream. However, the function is only passing to the script the pids corresponding to the pids required. And it never stops to send them to the script! So, the trouble here is that the script will return the same packets, but processed. In any sense I'm suggesting to pass a few packets and then receive a chunk. In fact, what happens if a table is updated? In this case, the TSDuck requires to regenerate the outcomings.

Perhaps you're thinking now in the problem of the repetition intervals. But this is not a problem, because a packet of pid 0 is replaced only with packets of pid 0. So the challenge for the script (or most specifically, for the TSDuck toolkit) is to return the same numbers of packets for each pid. Of course, at start every packet requries to be converted to a NULL as no outputs can be created without input data.

At time, the TSDuck is focused on process the entire transport stream. And not to work using a Transient Asynchronous Communication model between different processes. So this is the problem when I request for a Python example. The other part: how to send the packet from the original process and receive it, can be done in different ways, for example using a broker. But now the difficult that I can't solve is how to handle the TSDuck binding. Regarding the tsp command, that's easy and it's out-of-scope. Current examples are sufficient to handle it.

Can you help me, please?

Just to mention, and perhaps to encourage you to solve this problem is that this will open a very easy method to execute fast tests. You can imagine any headend equipment (profesional or amateur) that can provide a callback functionality to filter&processing the TS packets. Then using simple TSDuck scripts (perhaps more than one running at the same time) is possible to process the original Transport Stream. All without requiring complex equipment and with sufficient robustness as if the script fails, then you only lost the pids filtered but not touch the original structure of the TS. Futhermore, a failback mode can also implemented be and if the script fails then not nullified packets are generated. This will be possible because the TSDuck toolkit is not processing the entire TS, it's just working in parallel. You agree with this alternative working model?

lars18th commented 3 years ago

Hi @lelegard ,

I feel that last commits related to "memory plugins" are your solution to this request. So, please try to answer to these questions:

Now, regarding an alternative push mode:

Thank you for your invaluable effort! Regards.

lelegard commented 3 years ago

I feel that last commits related to "memory plugins" are your solution to this request.

It can be. By interfacing the existing "plugin events" mechanism with Java and Python (J/P), I try to improve the usefulness of TSDuck in those languages. C++ programs have access to hundredths of TSDuck classes while J/P programs can't. In these languages, the TSDuck paradigm is to "program" what should be done as a TSProcessor instance. On top of that base, the J/P code builds the parameters of the plugin chain but does not interact with it, once started. By adding pure abstract J/P logging classes and plugin options such as --log-XXX-line, the J/P code can receive serialized data (XML, JSON, hexa binary). Now, the introduction of pure abstract J/P event handler classes extends the exchanges to plugin events and the communication in both directions (J/P to plugins and vice-versa).

The use of "plugin events" in J/P was made available with the new option --event-code in plugins tables, psi or mpe (I have a request to handle MPE datagrams in Python). Then, the creation of the memory plugins became trivial. Have a look at the source code here and here, it is absolutely trivial. This is what I like the most: create simple but mightily generic basic mechanisms (e.g. the plugin events) on top of which you can build a lot of trivial but powerful mechanisms. This is the root of the Unix paradigm as KT & DR created it 50 years (!) ago.

Note that the "plugin events" mechanism already existed for a while but was used only in dedicated plugins and applications (see a C++ example here). It was not really visible to command line users.

The current implementation seems to follow a pull mode. So when the TSDuck library requires a packet it reads it from the memory calling to the callback function of the script, and when it wants to output a packet it calls to the other callback function of the script. It's this true?

Yes, this is correct. But this a is mix of modes: the input is in pull mode while the output is in push mode.

The reason of this architecture is obvious: reuse the existing "plugin events" mechanism.

What happens if the callback functions (read or write) will block (for example, because no input packets or because no space to consume the output)?

It is perfectly legitimate to block in input and output. This is the essence of "input" and "output". If the event handler for the input plugin blocks, the plugin simply waits for it. Calling plugin events is done synchronously in the context of the plugin thread. Waiting for input is the main role of an input plugin. The same thing applies to output plugins.

And how many packets are expected to be read/written in each call?

See the description of the Python class ts.AbstractPluginEventHandler here.

It explains this: When a plugin signals an event, it passes binary data as a bytearray in the data parameter of the overiden handlePluginEvent method. The plugin may allow these data to be updated of replaced when context.read_only_data is False. In that case, if the event handler wants to replace the data, it returns another bytearray as function result. The returned data replaces the input data. If the function returns nothing or something else than a bytearray, this input data are left untouched. The maximum acceptable size in bytes of the returned data is context.max_data_size.

This is a generic mechanism. Each plugin decides how to use it. For the memory plugins, see the TSDuck user's guide. It basically explains this:

Now, regarding an alternative push mode:

I initially thought about it and even prototyped it. But it does not work in sequential code. You cannot simply call TSProcessor.start() and later call some write/push/send function to push packets on the stream. This is due to the startup sequence of a plugin chain. As part of the startup, a number of packets are read to establish the validity, nature and possibly bitrate of the stream. So, TSProcessor.start() blocks until enough packets are read. Since you wait for start() to complete before starting to push packets, this is a deadlock... The only way to solve this is to write a multi-threaded program and call TSProcessor.start() in one thread while pushing packets from another thread.

So, this method has too many disadvantages.

As a consequence, we stay with the simple and generic "plugin events". There won't be any dedicated push API for the memory input plugin (and no dedicated pull API for the memory output plugin). No way, no need to insist :)

It will be possible to run in this way? I feel it can be more difficult, but in case of rewriting some pids, it will be desirable to read (from the TSDuck output to memory) at the same speed of writing (from memory to TSDuck input).

If you mean something like loop on "push an input packet, pull an output packet", the answer is no. This is not even possible in the generic case since some plugins may remove a packet and you will infinitely wait for an output packet that may never come.

See

lars18th commented 3 years ago

Hi @lelegard ,

Thank you for this deep explanation. I need to read more to understand all.

Until that, some questions. And please don't think I'm not accepting your clear comment "No way, no need to insist :)":

Regards.

lars18th commented 3 years ago

Hi @lelegard ,

I initially thought about it and even prototyped it.

I forgot to mention it: thanks for trying it out! :wink:

lelegard commented 3 years ago

As the model "input push, output pull" is not viable.

This is a gratuitous assertion. It is viable. This is just a matter of mindset.

If you want process a low bitrate stream and avoid the delays of buffering, try to adjust of the various TSProcessor properties (counterparts of tsp options) here: https://tsduck.io/doxy/classts_1_1tsp_1_1_t_s_processor.html

Especially try to set initial_input_packets, max_flushed_packets and max_input_packets to 1.

lars18th commented 3 years ago

Especially try to set initial_input_packets, max_flushed_packets and max_input_packets to 1.

Oh! Then, if I block in the reading function of the the memory (input) until I've one packet, I can expect that the memory (output) will call to the writing function only one time? That's true? If true, then the solution is here. But regarding the processing time: using this blocking model with packet length of 1, the entire TSDuck process will run in his own thread, or it shares/uses the script thread? The question is releveant, because from the master application if the filtering process is called for every packet, I can't wait for the TSDuck processing. So a need to be sure if I need to create a new thread for the memory plugin or with a simple semaphore in the read and write functions will be sufficient.

In any case, you think it will be useful to have one example of such side processing?

lelegard commented 3 years ago

In the application, C++, Java or Python, it does not matter, you have many threads. Most of them are transparent to the user.

lars18th commented 3 years ago

In the application, C++, Java or Python, it does not matter, you have many threads. Most of them are transparent to the user.

Then be more clear: When implementing the side processing from the main application using the TSDuck binding... if I use the memory input to write packets and the memory output to read packets, and I configure the chain with the "packets lenght to 1", and I "block" the read callback function from the memory function... What I need to handle to be threadsafe? In the main application I only have a filter_function() that is called for every packet and it can't be blocking.