IBMStreams / streamsx.topology

Develop streaming applications for IBM Streams in Python, Java & Scala.
http://ibmstreams.github.io/streamsx.topology
Apache License 2.0
29 stars 43 forks source link

endParallel() on non-parallel stream causes sc error #1666

Closed johnmac-ibm closed 4 years ago

johnmac-ibm commented 6 years ago

If I call endParallel() on a stream that's not parallel I get an error from sc;

SEVERE: /tmp/tk2370428622921196130/endparallel/endParallel.spl:16:64: CDISP0009E ERROR: A syntax error exists at the identifier token in operator invocation head. A viable parsing alternative does not exist.
May 14, 2018 3:12:20 PM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
SEVERE: /tmp/tk2370428622921196130/endparallel/endParallel.spl:16:81: CDISP0008E ERROR: A token is missing in the function definition of the SPL program. The token is identifier. The expected token is '('.

This is clearly a case of user error, but rather than getting a relatively cryptic sc error it would be nice if the graph builder caught this and gave a more direct error.

This was on master.

Sample:

import java.util.Arrays;
import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContextFactory;

public class EndParallel {
    public static void main(String[] args) throws Exception {
        Topology topology = new Topology("endParallel");

        TStream<String> stream = topology.constants(Arrays.asList("Boom!"));
        stream.endParallel();
        StreamsContextFactory.getStreamsContext("STANDALONE").submit(topology);
    }
}
wmarshall484 commented 6 years ago

The topology toolkit should never generate invalid SPL.

This error should be caught, but it needs to be caught after submit() is called. Due to cycles in the graph (using PendingStream), a non-parallel stream can later become parallel. Therefore, we need to wait until the graph is finalized before catching such errors.

markheger commented 4 years ago

error message

Mar 19, 2020 2:42:54 PM com.ibm.streamsx.topology.internal.streams.InvokeMakeToolkit invoke
INFO: Invoking spl-make-toolkit
Mar 19, 2020 2:42:54 PM com.ibm.streamsx.topology.internal.streams.InvokeMakeToolkit invoke
INFO: /opt/ibm/InfoSphere_Streams/4.3.0.0/bin/spl-make-toolkit --make-operator -i /tmp/tk3005167984118879060 
Mar 19, 2020 2:42:55 PM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
SEVERE: /tmp/tk3005167984118879060/com.ibm.streamsx.topology.test/endParallel_0_278bcc07_3099_4882_aaa8_0a8fed905606.spl:16:96: CDISP0009E ERROR: A syntax error exists at the identifier token in operator invocation head. A viable parsing alternative does not exist.
Mar 19, 2020 2:42:55 PM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
SEVERE: /tmp/tk3005167984118879060/com.ibm.streamsx.topology.test/endParallel_0_278bcc07_3099_4882_aaa8_0a8fed905606.spl:16:111: CDISP0008E ERROR: A token is missing in the function definition of the SPL program. The token is '('. The expected token is '}'.
Mar 19, 2020 2:42:55 PM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
SEVERE: 
Mar 19, 2020 2:42:55 PM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
SEVERE: CDISP0092E ERROR: Because of previous compilation errors, the compile process cannot continue.
Mar 19, 2020 2:42:55 PM com.ibm.streamsx.topology.internal.streams.InvokeMakeToolkit invoke
INFO: spl-make-toolkit complete: return code=1

generated SPL:

namespace com.ibm.streamsx.topology.test;
public composite endParallel_0_278bcc07_3099_4882_aaa8_0a8fed905606
{
graph
@spl_note(id="__spl_layout", text='{"kind":"Constants"}')
@spl_note(id="__spl_sourcelocation", text='{"file":"EndParallelTest.java","class":"com.ibm.streamsx.topology.test.spl.EndParallelTest","method":"testInvalidEndParallel","line":49,"api.method":"constants"}')
@spl_note(id="__spl_nativeType_output_0", text='E')
stream<blob __spl_jo> ObjectConstants = com.ibm.streamsx.topology.functional.java::Source  ( )
  {
    param
      functionalLogic: 'rO0ABXNyADJjb20uaWJtLnN0cmVhbXN4LnRvcG9sb2d5LmludGVybmFsLmxvZ2ljLkNvbnN0YW50cwAAAAAAAAABAgABTAAEZGF0YXQAEExqYXZhL3V0aWwvTGlzdDt4cHNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHAAAAABdwQAAAABdAAFQm9vbSF4';
  config
    streamViewability: false;
  }

  ( stream<blob __spl_jo> ObjectConstants_end_parallel_OUT0) as ObjectConstants_end_parallel = $EndParallel$  ( ObjectConstants)
  {
  }

}
markheger commented 4 years ago

python topology runs into the same "invalid SPL" error

import itertools
import string

from streamsx.topology.topology import *
from streamsx.topology.topology import Topology
from streamsx.topology.context import submit, ContextTypes, ConfigParams, JobConfig

topo = Topology()
s1 = topo.source(lambda : itertools.count())
s1.end_parallel()
cfg = {
    ConfigParams.SSL_VERIFY: False,
    'topology.keepArtifacts': True
}
submission_result = submit(ContextTypes.STANDALONE, topo, config=cfg)
markheger commented 4 years ago

PR #2391 includes a test case

The test case is located in Java test suite: test/java/src/com/ibm/streamsx/topology/test/spl/EndParallelTest.java

cd test/java
ant unittest.standalone -Dtopology.test.base.pattern='**/EndParallelTest.java'