IBMStreams / administration

Umbrella project for the IBMStreams organization. This project will be used for the management of the individual projects within the IBMStreams organization.
Other
19 stars 10 forks source link

Proposal: streamsx.expiredtupledetector repository #117

Closed nysenthil closed 7 years ago

nysenthil commented 7 years ago

This toolkit proposal is to create a toolkit to hold data tuples inside an operator’s in-memory data structure for a specified period of time and that operator should then detect the expiration time for each of those tuples and evict them appropriately to be handled by the downstream application logic.

Code contributions for this toolkit were made by David Koster, Fang Zheng and Senthil Nathan from the IBM Streams team. All the C++ code in this toolkit is 100% homegrown and there are no external dependencies other than the standard C++ libraries already available in Linux. I request via this proposal for a new repository named streamsx.expiredtupledetector be created to hold a new toolkit named com.ibm.streamsx.expiredtupledetector.

Possible use cases where this toolkit is applicable: 1) Aging out tuples based on application specific criteria. 2) Response timeout detection and replaying data tuples to an external service. 3) Event notification to external consuming applications at a later time in the future. 4) Design patterns requiring storing and forwarding of tuples after a preset delay.

Following code snippet (not compile ready) shows the usage of this operator by giving high level hints about what it does.

use com.ibm.streamsx.expiredtupledetector::*;

type FlightType = rstring flightId, rstring flightModel, rstring origin, rstring destination; // TTL snapshot type must have a single attribute that is a // list of your event tuple type. TTLSnapshotType = list ttlSnapshotList; ExpireTimeOverrideType = uint64 expireTime; DeleteTupleType = rstring flightId;

graph ... ... ... ... // This operator will hold the incoming tuples for a specified duration and then send them out // after that time expires. If a new tuple arrives with the same key, then an earlier tuple // with the same key being held will be replaced with the newly arrived tuple and the // count down for the expiry time will be started freshly. // // 1) First input stream is the regular data tuple that needs to be held for a // configured amount of expire time. // // 2) Second input stream is the periodic timer signal that must be sent to this operator // to perform the expired tuple detection and then send that tuple out of this operator. // // 3) Third input stream is used to override/change the expire time in the middle of the operation. // This stream will be effective only when the operator parameter allowExpireTimeOverride is // set to true. If it is set to false, any tuple received via this stream will be ignored. // When the expire time is changed via this stream, any future data tuples sent into this // operator will be made to expire after the newly overridden expire time. // All the existing data tuples being held inside this operator will not be affected by // the newly overridden expire time and they will expire at the time that was in effect before // the expire time override happened. // // 4) Fourth input stream is used in situations where there is a need to delete // an existing data tuple that is being held inside this operator. Simply send a // tuple via this fourth input stream with that tuple's first attribute // carrying a value needed to identify the data tuple to be deleted from // this operator's internal in-memory data structure. // // 5) This operator can also be optionally queried to return a snapshot of its // internal cache at a given point in time. This can be done by having an optional // fifth input stream and by having an optional second output stream as shown below. // // Output streams: // a) First output stream will carry the original data tuples. // b) Second (optional) output stream will carry the TTL snapshot tuples when a query is made. // (stream FlightInfoAfterTTLExpiry; stream TTLSnapshot) = ExpiredTupleDetector(FlightIn; TimerSignal; ExpireTimeOverride; DeleteTuple; TTLSnapshotQuerySignal) { param // Attribute in the incoming tuple that will be used as a key. key: FlightIn.flightId;

      // Duration (in milliseconds) for which tuples should be held inside this operator.
      // Those tuples will be sent out after that time duration expires.
      // It takes a uint64 value.
      expireTime: 60000ul;

      // Do we want to allow the user to change the expire time value in the middle of the operation?
      // It takes a boolean value.
      // If there is going to be a maximum of 100K entries held inside this operator, it is okay to
      // allow the user to change the expire time value in between different tuples.
      // For more than 100K entries, changing the expire time value arbitrarily will 
      // have a performance impact in iterating over the entire data structure inside this
      // operator to find the expired entries. This parameter lets the operator execute an
      // optimized logic if the user wants to use the same expireTime for the full life of this
      // operator versus user keeps overriding the expireTime arbitrarily via the third input stream.
      allowExpireTimeOverride: true;

}

... ... ... ...

Yifat-Yulevich commented 7 years ago

+1

Yifatt

ddebrunner commented 7 years ago

Having a repo for a single operator seems overkill.

It does seem that there is a need for a general repo that contains "useful" operators/patterns.

Maybe this would actually fit in the existing streamsx.plumbing?

nysenthil commented 7 years ago

Hi Dan, I need an immediate delivery vehicle via the IBMStreams GitHub for a very active Streams enterprise customer to consume this new facility they asked for. That is what our management agreed to do with that customer. Please advise if streamsx.plumbing is the right repo for this C++ operator.

chanskw commented 7 years ago

Do we foresee that this may eventually go into the Standard SPL toolkit? We have had other cases where we are creating operators that should eventually go into the SPL toolkits. I am wondering if we need a repository for "general purpose" operators. The repository will be used as an incubation repository before it's ready to be integrated back to the SPL toolkit.

chanskw commented 7 years ago

If we follow what we did with the geosptial toolkit, we could have a repository called streamsx.splext which can be used as SPL toolkit extension.

ddebrunner commented 7 years ago

@chanskw I think plumbing might be that toolkit.

This operator doesn't seem to really process/modify tuples, just delay them and provide access to them. Seems a strong possibility for plumbing.

nysenthil commented 7 years ago

Our customer has completed their evaluation of this operator yesterday. They need a way for their different team members to download it rather than getting it via email. I will attempt to fork the streamsx.plumbing toolkit, add this operator and create a pull request. If I face any access permission problems, I will contact Dan and/or Samantha. Thank you.

nysenthil commented 7 years ago

Hi Dan, I created a pull request 45 in the streamsx.plumbing repository. Please review and see if you can merge it with the master branch.

chanskw commented 7 years ago

I merged the code into streamsx.plumbing. Closing this request.