projectriff / riff

riff is for functions
https://projectriff.io
Apache License 2.0
800 stars 64 forks source link

Serverless event driven Riff with Kafka #871

Closed patpatpat123 closed 3 years ago

patpatpat123 commented 5 years ago

Hello Riff Team,

First of all, thank you a lot for this very interesting project. I have been following Mark Fisher and Dave Syer talks, and learned a lot on this project. Currently, I am working on a use case, which uses Kafka messages as trigger of the functions.

Josh, Mark, Dave and many more did some demos, but it is with the old riff version. Furthermore, I am having a specific case, which I would like your help.

I have a Spring Cloud Function project, very basic. https://github.com/patpatpat123/riffquestion/tree/master/src/main/java/some/github/question

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

public class FirstFunction implements Function<Flux<Foo>, Flux<Bar>> {
    @Override
    public Flux<Bar> apply(Flux<Foo> fooFlux) {
        return fooFlux.map(oneFoo -> new Bar(oneFoo.getSomeFoo().toUpperCase()));
    }
}

public class SecondFunction implements Function<Flux<Foo>, Flux<Bar>> {
    @Override
//    @StreamListener(MultipleProcessor.NUMBERTWO)
//    @SendTo(MultipleProcessor.NUMBERFOUR)
    public Flux<Bar> apply(Flux<Foo> fooFlux) {
        return fooFlux.map(oneFoo -> new Bar(oneFoo.getSomeFoo().replace("hello", "world")));
    }
}

The functions are dockerized and deployed in Kubernetes.

NAMESPACE          NAME                                             READY     STATUS            RESTARTS   AGE
default            _kafka-service-74bbd897f-gqwb_t                    1/1       Running           0          30m
**default            riffquestion-00001-deployment-568d8d977c-wkttt**   0/3       PodInitializing   0          25s
default            zookeeper-58759999cc-s4w89                       1/1       Running           0          30m
istio-system       istio-citadel-84fb7985bf-tq7h5                   1/1       Running           0          29m
istio-system       istio-cleanup-secrets-6ct2j                      0/1       Completed         0          29m
istio-system       istio-egressgateway-bd9fb967d-mkkzv              1/1       Running           0          29m
istio-system       istio-galley-655c4f9ccd-pnbjh                    1/1       Running           0          29m
istio-system       istio-ingressgateway-688865c5f7-7pgwq            1/1       Running           0          29m
istio-system       istio-pilot-6cd69dc444-22mjf                     2/2       Running           0          29m
istio-system       istio-policy-6b9f4697d-s82h6                     2/2       Running           0          29m
istio-system       istio-sidecar-injector-8975849b4-h5h6c           1/1       Running           0          29m
istio-system       istio-statsd-prom-bridge-7f44bb5ddb-xqh46        1/1       Running           0          29m
istio-system       istio-telemetry-6b5579595f-qwb8v                 2/2       Running           0          29m
istio-system       knative-ingressgateway-77b757d468-qsrhd          1/1       Running           0          13m
knative-build      build-controller-7bd96569d-8g6rb                 1/1       Running           0          13m
knative-build      build-webhook-5c99bb9b99-672r6                   1/1       Running           0          13m
knative-eventing   eventing-controller-59d55d8fd8-7mt8s             1/1       Running           0          13m
knative-eventing   stub-clusterbus-dispatcher-c86bb6bff-9crn6       2/2       Running           0          10m
knative-eventing   webhook-c5878f4-tk95s                            1/1       Running           0          13m
knative-serving    activator-7d5b5ff75c-mnn2n                       2/2       Running           0          13m
knative-serving    autoscaler-86859bd685-kzmzs                      2/2       Running           0          13m
knative-serving    controller-5f66d69957-k9j9w                      1/1       Running           0          13m
knative-serving    webhook-5b6cf589d7-kpvcj                         1/1       Running           0          13m
kube-system        coredns-c4cffd6dc-9tknm                          1/1       Running           0          31m
kube-system        etcd-minikube                                    1/1       Running           0          31m
kube-system        kube-addon-manager-minikube                      1/1       Running           0          30m
kube-system        kube-apiserver-minikube                          1/1       Running           0          30m
kube-system        kube-controller-manager-minikube                 1/1       Running           0          31m
kube-system        kube-dns-86f4d74b45-ww4h7                        3/3       Running           0          31m
kube-system        kube-proxy-rrcdd                                 1/1       Running           0          31m
kube-system        kube-scheduler-minikube                          1/1       Running           0          30m
kube-system        kubernetes-dashboard-6f4cfc5d87-gn7sd            1/1       Running           0          31m
kube-system        storage-provisioner                              1/1       Running           0          31m

riff service create riffquestion --image xxx/docker-riffquestion

And I am having an external service, that is supposed to trigger the functions like this:

public class HttpPostJsonExample {
    public static void main(String[] args) throws Exception {
        String payload=**"{\"someFoo\":\"hello\"}"**;
        StringEntity entity = new StringEntity(payload, ContentType.APPLICATION_JSON);
        HttpClient httpClient = HttpClientBuilder.create().build();
        HttpPost request1 = new HttpPost("**http://192.168.99.100:32380/firstFunction**");
        HttpPost request2 = new HttpPost("**http://192.168.99.100:32380/secondFunction**");
        request1.setEntity(entity);
        request2.setEntity(entity);
        request1.setHeader("Host", "riffquestion.default.example.com");
        request2.setHeader("Host", "riffquestion.default.example.com");
        HttpResponse response1 = httpClient.execute(request1);
        HttpResponse response2 = httpClient.execute(request2);
        HttpEntity responseEntity1 = response1.getEntity();
        HttpEntity responseEntity2 = response2.getEntity();
        System.out.println("the first response " + EntityUtils.toString(responseEntity1));
        System.out.println("the second response " + EntityUtils.toString(responseEntity2));
    }
}

Very happy about this project, I can see the beauty of Riff and Serverless. Even with the two functions! the first response [{"someBar":"HELLO"}] the second response [{"someBar":"world"}]

However, my real use case: I am having external services not sending the http post, but sending Kafka messages. Like this. (With other consumer services, but let's keep it simple for now)

public static void main(String[] args) {
        System.out.println("Message start");
        Properties kafkaParams = new Properties();
        String topicName = "**anyIdea?MaybefirstFunctionInput?**";
        kafkaParams.put("bootstrap.servers", "kafka:9092");
       [...]
        Producer<String, String> producer = new KafkaProducer<>(kafkaParams);
        System.out.println("Message go");
        producer.send(new ProducerRecord<>(topicName,"key", "**{\"someFoo\":\"hello\"}**"));
        System.out.println("Message sent successfully");
        producer.close();
    }
}

This is my real use case. I believe it is fairly an interesting one, at least, a reasonable one. Is it possible to do so with Riff? If yes, could you please help me achieve this?

MacOsX, java 1.8
$ riff version
Version
  riff cli: 0.1.3 (a216005db0d50056c41b45fdc3384b09ad24381d)
$ minikube version
minikube version: v0.29.0
$ kubectl version
Client Version: version.Info{Major:"1", Minor:"11", GitVersion:"v1.11.2", GitCommit:"bb9ffb1654d4a729bb4cec18ff088eacc153c239", GitTreeState:"clean", BuildDate:"2018-08-08T16:31:10Z", GoVersion:"go1.10.3", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.0", GitCommit:"fc32d2f3698e36b93322a3465f63a14e9f0eaead", GitTreeState:"clean", BuildDate:"2018-03-26T16:44:10Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}

<parent><groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent><dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Finchley.SR1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-function-web</artifactId>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.springframework.cloud</groupId>-->
            <!--<artifactId>spring-cloud-function-stream</artifactId>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework.cloud</groupId>-->
            <!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

I believe the idea of having external Microservices, sending http posts, and/or Kafka messages to trigger the function corresponds to the core concepts of event driven architecture and Serverless architecture. I will be glad if I can achieve this.

Thank you a lot for this project, thank you a lot for your kind help.

scothis commented 5 years ago

With riff 0.0.x we used Kafka to pull messages to the function for invocation. With riff 0.1 we use HTTP to push messages to the function for invocation. In this new model, it's still architecturally possible to use Kafka as the driver for functions invocations. What you will need (and doesn't exist yet) is an event Source that reads from Kafka. In this model, your function shouldn't need to know anything about Kafka (or HTTP for that matter) in order to receive messages from a Kafka topic.

The particulars around Sources are very much inflight and not ready to be implemented quite yet, But I do expect to see a Kafka based Source be implemented, similar to how we already have a Kafka based Bus implementation for Channels.

I hope that helps.

patpatpat123 commented 5 years ago

Hello Scott,

Thank you a lot for your clear answer. Will it be possible to have a roadmap for which version will this be possible? Would it be possible to use a workaround based on Spring Cloud Stream?

@Bean @Input(NUMBERONE) SubscribableChannel binding1();

@Bean
@Input(NUMBERTWO)
SubscribableChannel binding2();

@Bean
@Output(NUMBERTHREE)
MessageChannel singleOutput();

@Bean
@Output(NUMBERFOUR)
MessageChannel anotherOutput();

@Override @StreamListener(MultipleProcessor.NUMBERONE) @SendTo(MultipleProcessor.NUMBERTHREE) public Flux apply(Flux fooFlux) { return fooFlux.map(oneFoo -> new Bar(oneFoo.getSomeFoo().toUpperCase())); }

@Override
@StreamListener(MultipleProcessor.NUMBERTWO)
@SendTo(MultipleProcessor.NUMBERFOUR)
public Flux<Bar> apply(Flux<Foo> fooFlux) {
    return fooFlux.map(oneFoo -> new Bar(oneFoo.getSomeFoo().replace("hello", "world")));
}

spring.cloud.stream.kafka.binder.brokers=knative-eventing:9092 spring.cloud.stream.kafka.binder.zkNodes=knative-eventing:2181

String topicName = "numberone"; kafkaParams.put("bootstrap.servers", "knative-eventing:9092"); producer.send(new ProducerRecord<>(topicName,"key", "{\"someFoo\":\"hello\"}"));

String topic = "numberthree"; props.put("bootstrap.servers", "knative-eventing:9092"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic));

I believe a Kafka/Rabbit based trigger that will still work with riff can be a good idea. Thank you again for your presentation with Eric.

tongdaoqa commented 5 years ago

I am also investigating on the same. My startup company has some upstreams systems that just put some messages into a common Kafka cluster with specific topics. They do not wait for any responses and do not care about who consume the messages.

We are currently having some ad hoc consumers that consume the messages and send the transformed payload into some other downstream systems.

Right now, we have those transformers deployed waiting for those messages. They stay idle most of the time. And when it gets busy, they cannot handle the loads and we have to manually deploy more transformers. Very painful.

We are looking forward to migrate our transformers as functions. Most of all, to use riff. Maybe something like

Is it possible?

tongdaoqa commented 5 years ago

May I kindly ask if it is possible to scope and milestone this feature please?

KafkaProServerless commented 5 years ago

I am also interested in this.

All I can see from your demos, getting started website and quick starts are functions that responds to just some riff invoke or just some curl commands. This is unlikely to be representative of serverless architectures. It is not feasible to have all other services sending riff commands, neither sending curl. Other products have triggers based on data base change, GitHub change and much more.

A Kafka based (or any other message bus) trigger for functions created by Riff will be a very nice addition for this project. I find this serverless solution based on Knative to be quite nice. Having Kafka triggers would be a huge plus