spring-cloud / spring-cloud-dataflow

A microservices-based Streaming and Batch data processing in Cloud Foundry and Kubernetes
https://dataflow.spring.io
Apache License 2.0
1.11k stars 582 forks source link

Add support for multiple input/output channels #2331

Closed rahulbats closed 6 years ago

rahulbats commented 6 years ago

my stream takes multiple input channnel. For instance it takes input from 2 topics and creates a ktable from one and joins it with kstream from other. Same way in output it does a branch and sends to multiple channels. How do i get a processor like this to work in dataflow. Since spring-cloud-stream support multiple input and output channels, why does dataflow not support it ?

sabbyanandan commented 6 years ago

Hi, @rahulbats. Can you share a sample of the Application? I'd like to review your channel configurations and the corresponding destination mapping.

SCDF automates the channel/destination naming for applications in the pipeline. The naming convention typically follow the <stream-name>.<producer-app-name> pattern. If you have a custom name or a descriptive channel naming-conventions in the App, SCDF won't be able to use them as-is.

We re exploring options to support this type of orchestration more natively in SCDF, but before we get into the details, it'd be better to confirm your Application configs and what you see as current runtime behavior.

rahulbats commented 6 years ago

Hi, i am using the app to join a kstream and ktable created from 2 kafka topics

i am using a custom binding like this

public interface SubscriptionsNotificationsBindings {
    @Input("events")
    KStream<?, ?> events();

    @Input("subscriptions")
    KStream<?, ?> subscriptions();

    @Output("emails")
    KStream<?, ?> emails();

    @Output("texts")
    KStream<?, ?> texts();

}

and then my processor looks like this

@EnableBinding(SubscriptionsNotificationsBindings.class)
public class EventToNotificationGenerator {

    @StreamListener
    @SendTo({"emails","texts"})
    public KStream<String, String>[]  process(@Input("events") KStream<String, String> events,
                        @Input("subscriptions") KStream<String, String> subscriptions
            ) {
}

my application yml has bindings like this

spring.cloud.stream.bindings.subscriptions:
 destination: com.bnsf.snf.subscriptions
 consumer:
  headerMode: raw  
spring.cloud.stream.bindings.events:
 destination: com.bnsf.snf.events
 consumer:
  headerMode: raw
spring.cloud.stream.bindings.texts:
 destination: com.bnsf.snf.texts
 producer:
  use-native-encoding: true
spring.cloud.stream.bindings.emails:
 destination: com.bnsf.snf.emails
 producer:
  use-native-encoding: true  

this is how i was able to use a multi input multi output processor

sabbyanandan commented 6 years ago

A multi-channel application is a common practice in Spring Cloud Stream.

A stream primitive in SCDF, however, it deals with 1 input and 1 output channel. That's represented by the | symbol in the DSL/UI. There's no direct support to interact with multiple channels (within the App) in the DSL yet. We do support named-destinations, with which, you could build a fan-in / fan-out pipeline.

As another option, you can accomplish a similar behavior by overriding the default naming conventions of the channels both at the upstream and downstream positions.

Let's review an example. This processor is similar to yours, but I've separate @StreamListener handlers that consume the "same data" coming from the upstream users topic. The stream definition would look like:

stream create foo --definition "usersource | userprocessor | websocket"

Deployment:

stream deploy foo --properties "app.usersource.spring.cloud.stream.bindings.output.destination=users,app.userprocessor.spring.cloud.stream.bindings.users.destination=users,app.userprocessor.spring.cloud.stream.bindings.usersbyregion_input.destination=users,app.userprocessor.spring.cloud.stream.bindings.usersbyregion_output.destination=userscount,app.websocket.spring.cloud.stream.bindings.input.destination=userscount"

The deployment specifically overrides the channel/destination for the apps. You could follow a similar model for your application, too.

Lastly, we are exploring options to orchestrate multi-channel applications in SCDF. If you could elaborate on your use-case and why you'd prefer it to be orchestrated via SCDF, we could study the feasibility with concrete requirements.

markpollack commented 6 years ago

We definitely have the ability via Skipper to deploy any arbitrary set of Boot applications to CF/K8s and manage them. This can be a group of applications or single application. (Note we are opening that up in skipper 1.1 to any cf-manifest based app - so not just boot apps).

Couple options: If you are deploying just a single application, you can just go do a good old cf push type workflow, but then you would lose the management of what skipper offers and the sort of 'automatic' creation of cf manifests via SCDF. The other option is to create a skipper package for your application and then upload that directly to Skipper, by-passing SCDF. We don't yet have a maven/gradle plugin for this, so you will have to manually create the package as described in the skipper docs.

I am assuming that perhaps the SCDF UI, DSL and the idea of monitoring/managing a group of applications in a first class manner is one of the compelling features to manage your use-case via SCDF (note there is no UI for Skipper). We would really like to hear your feedback on this point.

The exploration that Sabby mentions, e.g. 'orchestrate multi-channel apps', is just a subset of deploying a group of apps, they could be stream apps, or they could be a bunch of web apps. We wouldn't futz with the binding names (you can use topics that make 'sense' for you) or any application properties for that matter (e.g. related to metrics) as we do with stream deployment. We would use the app registry in SCDF to create the skipper package in a similar way to streams, but not impose any conventions wrt to how the apps are 'wired up'. A possible syntax would be

scdf> package create randoApp --definition="topic1ConsumerApp || multiTopicConsumerApp || topic3ProducerApp || plainOldWebApp"
scdf> package deploy randoApp --deploymentProperties=blahblah

where the || symbol would probably be the only change to the stream DSL to indicate independent deployment in parallel. Apps would still be registered with SCDF as before.

Also worth to note some future directions, spinnaker is going to be an option for use on PCF at some point, in which the runtime visibility will be there via a UI and the ability to query deployed applications based on general metadata tags, so a sort of grouping ability. Similarly wrt to how tags are managed ATM with micrometer+Prometheus and spring cloud stream apps.

sabbyanandan commented 6 years ago

Based on the latest discussion on this thread, an application with multi-input/output channels will be supported in SCDF with the following DSL pattern.

stream create mystream --definition "foo, bar, baz"

In this stream, foo, bar, and baz are SCSt applications with multiple inputs and output channel binding configured in the respective YAML/properties files. When SCDF deploys them, it won't attempt to automatically create the channels. Instead, the apps would rely on the packaged SCSt channel configurations to establish connection with the backing message broker.

SCDF would continue to provide DSL and GUI, coherent view of the data pipeline (for multi-input/output apps), and as well the opportunity to interact with granular applications in the Skipper mode.