TheOpenCloudEngine / uEngine5-base

uEngine5 BPMS that totally re-written in Microservices architecture. uEngine5 can act as not only a conventional Workflow or BPMS but also as a REST api orchestrator or a BPaaS (Business process as a service) of members of OCE's MSA components.
MIT License
10 stars 13 forks source link

CQRS and Event Sourcing #78

Open jinyoung opened 6 years ago

jinyoung commented 6 years ago
  1. 먼저, Kafka 를 설치하고 테스트한다:

https://kafka.apache.org/quickstart

  1. 참고자료

https://dreamix.eu/blog/java/building-microservices-with-netflix-oss-apache-kafka-and-spring-boot

https://www.codenotfound.com/spring-kafka-consumer-producer-example.html

예제를 일단 따라해보자:

먼저 process service 가 definition service 의 이벤트를 감지해야 한다 - definition 의 deploy 될 때 다른 서비스들은 해당 데피니션에 대한 어떤 처리를 할 수 있다.

또 다른 예제로, process service 가 하나의 액티비티를 수행한다면, 이에 대한 audit 을 처리할 수 있는 이벤트를 받을 수 있어야 한다.

위의 두 예제는 기존 monolithic 환경에서 각각 deployFilter, activityFilter 에서 수행하던 이벤트들이다. 이젠 MSA 환경에서 이들은 각각 business event 가 된다.

첫번째 시나리오를 커버하기 위해, definition service 와 process service 에 spring kafka lib 를 넣자:


    <!-- spring-kafka -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>${spring-kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <version>${spring-kafka.version}</version>
      <scope>test</scope>
    </dependency>

...

<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>

다른 eureka 등의 사용방법과 같이 다음의 kafka 설정을 application.yml 에 추가한다:

kafka:
  bootstrap-servers: localhost:9092  #여기에 kafka server  가 떠있다보고
  topic:
    helloworld: helloworld.t   # 이게 토픽명

이렇게 하고, topic 에 producing 하는 코드를 넣은 후,

#DefinitionServiceImpl.java

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        kafkaTemplate.send(topic, payload);
    }

    @RequestMapping(value=DEFINITION + "/message", method=RequestMethod.POST)
    public void kafkaSend(@RequestParam("msg") String msg){
        send("helloworld.t", msg);
    }

메시지를 날려본다:

http POST localhost:9001/definition/message?msg=sdsafdkf

다음과 같이 리스닝 하고 있으면:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic helloworld.t --from-beginning

sdsafdkf

메시지가 들어오는 것을 확인할 수 있다.

오케이, 그렇다면, 다음과 같이 제대로 만들어보자:

topic: 
   bpm: bpm.topic

message 는 이렇게 정의:

  deploy: Robot Process Automation.xml

데피니션 서비스가 해당 이벤트를 보내고, 프로세스 서비스는 해당 이벤트를 읽어 deploy filter 를 실행한다.