IBMStreams / streamsx.dps

The IBMStreams/streamsx.dps GitHub repository is home to the Streams toolkit named DPS (Distributed Process Store)
http://ibmstreams.github.io/streamsx.dps
Other
4 stars 9 forks source link

DpsWithReconnect Composites: Specify key, value, ttl attributes? #70

Closed Alex-Cook4 closed 7 years ago

Alex-Cook4 commented 7 years ago

When building SPL composites, unfortunately there isn't a way to have a default attribute type: i.e. I can't say the default attribute for the "key" we are inserting, is called key, but let users override it in other cases.

As a result, my design decision was to for the developer to have to put their key/value/ttl attributes into actual attributes with those corresponding names on the input stream.

I'm having doubts about whether that was the right decision, and if instead it is better to force them to specify the key, value, ttl attributes.

Pros of current implementation:

        stream<CTIEventWithCallStateAndEventHistory> RedisPutResults 
            = DpsPutTTLWithReconnect(PerparedForDpsEvents)
        {
            param
                outputType : CTIEventWithCallStateAndEventHistory; 
        }

Pros of forced attribute specification:

I am aware that we have done one release with the current implementation and this would be a breaking change. My feeling would be that since there probably hasn't been much adoption yet, it's better to break the interface for a better operator.

brandtol commented 7 years ago

+1 Having the attribute names configurable is much better, I think. Especially if someone wants to perform multiple puts/gets with the same tuple using different attributes as keys/values.

Alex-Cook4 commented 7 years ago

@brandtol I realized that we can't actually specify output attributes with SPL composites.

So unfortunately our "get" will still have to output things to an attribute named "value", but at least this still won't require mapping of tuples BEFORE processing.

Alex-Cook4 commented 7 years ago

I've opened #71 to address this issue. I have run testing and updated the sample (and tested). These changes are minimal from a logic perspective.

brandtol commented 7 years ago

Would it be feasible to require the user to specify the same stream schema for input and output ports of the composite ? Or require to have the value attribute already present in the input stream. That way, I think the attribute name of the value can be specified and used on the input stream ... ?

Alex-Cook4 commented 7 years ago

@brandtol That's a good idea, but I don't think it will work. For example, if we have value in the input stream, as far as I know, we wouldn't be able to overwrite value and then copy the incoming tuple to the outgoing tuple because input attributes are immutable in Custom operators (I'm pretty sure). Are you imagining something different?

brandtol commented 7 years ago

I attached a sample to clarify. The composite modifies the input tuple and than assigns the modified tuple to a fresh output tuple (similar to what is done in DpsGetOperator).

There is a general drawback when using assign, because it involves a copy of the tuple. If we require the user to have the same schema for input and output we could avoid that, but it makes the composite a bit more unflexible. Can you give that a try....

composite ParamTest (output OUT; input IN)
{
    param 
        attribute $key;
        attribute $val;
        type $resultType;

    graph
        stream<$resultType> OUT = Custom(IN as I)
        {
            logic
            onTuple I:
            {
                // write "lookup" result to value attribute 
                mutable rstring myKey  = I.$key;
                I.$val = myKey + "_lookup_result";

                // assign input tuple to output tuple 
                mutable $resultType retVal = ($resultType) {};
                assignFrom(retVal,I);

                // submit result
                submit(retVal, OUT); 
            }       
        }
}

composite Main
{
    type 
        ResultType = tuple<rstring unrelated1, rstring str1, rstring str2, rstring val1, rstring val2, int32 unrelated2>;

    graph

    // send some tuple to the composite
    stream<rstring str1, rstring str2, rstring val1, rstring val2> Tester = Custom()
    {
        logic   
        onProcess : 
        {
            submit( { str1="key1" , str2="key2" , val1="" , val2="" }, Tester);
        }
    }

    // invoke composite
    stream<ResultType> Comp = ParamTest(Tester)
    {
        param
            key : str1;
            val : val2;
            resultType : ResultType;
    }

    // print result
    () as Printer = Custom(Comp as I)
    {
        logic   
        onTuple I :
        {
            println(I);
        }
    }

}