projectriff / riff

riff is for functions
https://projectriff.io
Apache License 2.0
799 stars 64 forks source link

Cannot chain streaming functions #1356

Closed fbiville closed 5 years ago

fbiville commented 5 years ago

By chaining, I mean have a stream that is both the output stream of a function and the input stream of another function.

You can observe the issue while running this script.

The way to observe the problem is the following:

jldec commented 5 years ago

processor is crashing

Exception in thread "main" java.lang.IllegalArgumentException: Expected authority at index 2: //
    at java.net.URI.create(URI.java:852)
    at io.grpc.internal.DnsNameResolver.<init>(DnsNameResolver.java:164)
    at io.grpc.internal.DnsNameResolverProvider.newNameResolver(DnsNameResolverProvider.java:58)
    at io.grpc.internal.DnsNameResolverProvider.newNameResolver(DnsNameResolverProvider.java:41)
    at io.grpc.NameResolverRegistry$NameResolverFactory.newNameResolver(NameResolverRegistry.java:154)
    at io.grpc.internal.ManagedChannelImpl.getNameResolver(ManagedChannelImpl.java:696)
    at io.grpc.internal.ManagedChannelImpl.<init>(ManagedChannelImpl.java:578)
    at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:512)
    at io.projectriff.processor.Processor.lambda$indexByAddress$9(Processor.java:218)
    at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
    at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.util.stream.DistinctOps$1$2.accept(DistinctOps.java:175)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at io.projectriff.processor.Processor.indexByAddress(Processor.java:213)
    at io.projectriff.processor.Processor.<init>(Processor.java:165)
    at io.projectriff.processor.Processor.main(Processor.java:125)
Caused by: java.net.URISyntaxException: Expected authority at index 2: //
    at java.net.URI$Parser.fail(URI.java:2848)
    at java.net.URI$Parser.failExpecting(URI.java:2854)
    at java.net.URI$Parser.parseHierarchical(URI.java:3102)
    at java.net.URI$Parser.parse(URI.java:3063)
    at java.net.URI.<init>(URI.java:588)
    at java.net.URI.create(URI.java:850)
    ... 21 more
jldec commented 5 years ago

According to @ericbottard

I think this happens when there is a problem with the provisionner and the Status of the Stream doesn't reflect its address. Specifically https://github.com/projectriff/kafka-provisioner/issues/2

2 Provisioner loses connectivity to kafka?

After some time of inactivity, it seems newly created streams don't get ready.
Killing just the provisioner pod solves the situation, which seems to indicate some connection timeout is at play. https://github.com/projectriff/kafka-provisioner|projectriff/kafka-provisioner

fbiville commented 5 years ago

I edited the initial issue description on how to observe the problem.

ericbottard commented 5 years ago

Need a better way to reproduce this, as the provided script is rather stale wrt master:

In any case, after some other unrelated hiccups, chaining worked as intented for me. So, need to narrow down on the problem, if any...

ericbottard commented 5 years ago

Here is a better script for running the demo, assuming

jldec commented 5 years ago

Closing, cannot reproduce chaining failure.