IBMStreams / streamsx.topology

Develop streaming applications for IBM Streams in Python, Java & Scala.
http://ibmstreams.github.io/streamsx.topology
Apache License 2.0
29 stars 43 forks source link

Support structured schema with SPL type Sys.PaneTiming #2636

Closed markheger closed 3 years ago

markheger commented 3 years ago

In SPL the Sys.PaneTiming is used with event-time TimeInterval windows.

Following function could be used in output attribute assignments:

Sys.PaneTiming paneTiming()

The type Sys.PaneTiming is a valid attribute type in a defined SPL schema:

type GroupAvgWithLateData = float64 avgTemp, int32 numSensors, timestamp end,
    timestamp start, Sys.PaneTiming paneTiming ;

Python StreamSchema

In Python topology the following error is raised when StreamSchema is like this: StreamSchema('tuple<Sys.PaneTiming pt>')

SyntaxError: Invalid schema:tuple<Sys.PaneTiming pt> token TokenInfo(type=1 (NAME), string='Sys', start=(1, 6), end=(1, 9), line='tuple<Sys.PaneTiming pt>')

SPL definition

Sys.PaneTiming = enum { paneEarly, paneOnComplete, paneLate };

The timing of a window pane triggering in relation to the enclosing operator's watermark that is used for predicting pane completion.

markheger commented 3 years ago

Workaround is using rstring in structured schema.

        agg_schema = StreamSchema('tuple<uint64 sum, rstring paneTiming>').as_tuple(named=True)
        # timeInterval window
        win = s.time_interval(interval_duration=5.0, discard_age=30.0)
        agg = op.Map('spl.relational::Aggregate', win, schema=agg_schema)
        agg.sum = agg.output('Sum((uint64)num)')
        agg.paneTiming = agg.output('(rstring)paneTiming()')