quicoment / qc-mq-processing-server

qc-mq-processing-server
0 stars 0 forks source link

Feature/go amqp #6

Closed sypark9646 closed 3 years ago

sypark9646 commented 3 years ago

resolves #1

sypark9646 commented 3 years ago

일단은 consume 한 이후에 connection 을 끊는 부분은 넣지 않았는데 바로 끊는게 좋을까요??

AMYMEME commented 3 years ago

일단은 consume 한 이후에 connection 을 끊는 부분은 넣지 않았는데 바로 끊는게 좋을까요??

엇 근데 consume 하면 계속 리스너 유지되고 있지 않나요? connection 끊으면 안될 것 같긴해요

sypark9646 commented 3 years ago

게시물마다 consumer 커넥션을 2개씩 유지하고 있으면 리소스를 그만큼 사용할텐데 그렇다고 커넥션을 유지하고 있지 않으면 큐에 메세지가 온 것을 바로바로 알 수 없어서 어렵네용,,

AMYMEME commented 3 years ago

게시물마다 consumer 커넥션을 2개씩 유지하고 있으면 리소스를 그만큼 사용할텐데 그렇다고 커넥션을 유지하고 있지 않으면 큐에 메세지가 온 것을 바로바로 알 수 없어서 어렵네용,,

흠 consumer 커넥션을 2개씩 유지한다는게 어떤건가요??? exchange가 두개라서 그런건가요??

sypark9646 commented 3 years ago

커넥션 1개이네요 잘못 알고 있었습니다..ㅋㅋㅋㅋㅋ 아무튼 궁금한 점은 게시물이 늘어나면 => 큐는 계속 늘어날텐데 늘어나도록 두어도 될까? 였습니다..

그리고 ExchangeDeclare 함수를 까보니 아래와 같은 주석이 있는데,

Durable and Auto-Deleted exchanges will survive server restarts and will be
removed before and after server restarts when there are no remaining bindings.
These exchanges are useful for robust temporary topologies or when you require
binding durable queues to auto-deleted exchanges.

ExchangeDeclare 할 때 파라미터로 주는DurableAuto-Deleted를 true로 두면 서버 재시동시에도 복원이 가능하다고 해서 레디스에 큐 이름을 저장할 필요도 없을 것 같기도 하구요..

RabbitMQ가 아직도 정확히 어떻게 동작하는지 잘은 모르겠어서 조금 공부가 필요할 것 같네요🥲

AMYMEME commented 3 years ago

RabbitMQ가 아직도 정확히 어떻게 동작하는지 잘은 모르겠어서

제가 이해한 바에 따르면, 지금의 상황을 기준으로는 post 생성 시에 binding이 2개, queue는 1개가 할당됩니다. exchange는 처음부터 끝까지? 2개입니다. (처음부터 끝까지라는 말이 의미가 잘 와닿는지는 모르겠지만, 계속 생성하는게 아니라는 뜻이었습니다. 스프링 쪽에서는 처음에 어플리케이션 시작 시에 configuation bean 주입 할때에 2개의 exchange 빈을 미리 생성해 놓고 계속 그 bean을 갖다가 쓰는 느낌입니다.)

그리고 요청은 exchange 당 하나를 보낼 수 있습니다. (convertAndSend 함수의 인자로 exchange를 정해줘야 합니다.)

rabbitTemplate.convertAndSend(commentRegisterExchange.name, "post.${postId}.comment", commentRequest)
rabbitTemplate.convertAndSend(commentLikeExchange.name, "post.${postId}.comment.${commentId}", commentLikeRequest)

저희 direct exchange는 post.{postId}.comment 라우팅 키에 대해 ...post.{postId} 큐로 바인딩을 해줍니다. 나머지 하나의 topic exchange는 post.{postId}.comment.# 라우팅 키에 대해 똑같이 ...post.{postId}큐로 바인딩 해줍니다. (#는 아무거나 와도 됩니다.)

예를 들어, 이런식인데 아래의 q1 q2는 direct exchange나 topic exchange 모두 똑같은 q1, q2입니다.

|                 |   --binding(post.1.comment)--> q1
| Direct Exchange |   --binding(post.2.comment)--> q2
|                 |   --binding(post.3.comment)--> q3
post.1.comment  --- |                 |   --routing--> q1
post.2.comment  --- | Direct Exchange |   --routing--> q2
post.1.comment  --- |                 |   --routing--> q1

|                |   --binding(post.1.comment.#)--> q1
| Topic Exchange |   --binding(post.2.comment.#)--> q2
|                |   --binding(post.3.comment.#)--> q3
post.1.comment.1  --- |                |   --routing--> q1
post.2.comment.1  --- | Topic Exchange |   --routing--> q2
post.1.comment.2  --- |                |   --routing--> q1

따라서 producer 측에서는 요청 하나당 라우팅 키 하나와 어떤 exchange에 보낼 건지가 필요하다고 이해하시면 될 것 같습니다.

그리고 아마 consumer 측에서는 queue를 계속 listening 하다가 요청이 들어오면 그게 어디 exchange에서 왔는지 알 필요없이 저희가 messageType을 주니까 그걸 기준으로 처리하면 될 것 같습니다.

아 근데 golang 로직에서 처음에 rabbitMQ 연결시에 ExchangeDeclare를 하던데, 그 부분이 producer에서 하면 이미 exchange declare를 하면 consumer 부분에서는 안 해도 되는건지 궁금하긴 합니다.

게시물이 늘어나면 => 큐는 계속 늘어날텐데 늘어나도록 두어도 될까? 였습니다..

제가 이해한 게 맞다면 redis queue는 post당 하나가 맞는 것 같고, 지금은 redis queue : rabbit queue = 1 : 1인데, rabbit queue listener 개수가 늘어나는게 걱정이시라는 것 같습니다. (redis queue는 어쩔수 없는건가요?? redis와의 최대 커넥션 개수를 알 수 있을까요?)

저도 이 점이 조금 걱정인데, 차라리 위처럼 하지 않고, 댓글 등록/댓글 공감 큐를 고정적으로 두개만 생성하고, 모든 댓글 요청을 register queue아니면 like queue 로 보내는 방법도 있을 것 같습니다. 문제점은 지금은 queue이름에 postID가 있었기 때문에 requestBody에 post id가 포함되지 않는데, 만약 golang에서 listening 시에 routing key를 얻어올 수 있다면, routing key 자체에서 post id얻어 레디스 큐에 알아서 할당하는 식이 되어야 할 것 같아요. (이러면 exchange는 post.#.comment 라우팅 키를 바인딩하는 topic exchange) 만약 그게 안된다고 하더라도 코틀린 서버 request Body에 필드 하나만 추가되는 식이라 별 문제 없긴 합니다. (이러면 exchange는 그냥 register를 라우팅 키로 해서 바인딩하는 direct exchange)

레디스에 큐 이름을 저장할 필요도 없을 것 같기도 하구요..

저는 여기가 이해가 잘.. ㅎㅎ 저는 레디스가 어떤식으로 동작하는지 공부 좀 더 해보겠습니다.

sypark9646 commented 3 years ago

설명 감사합니다. 이제 조금 이해가 될 것 같네요.

레디스에 큐 이름을 저장할 필요도 없을 것 같기도 하구요..

현재 프로세스는 서버를 run 하게 되면 main.go 에서 setupConsumer()를 통해 consumer 연결을 하게 돼요 그런데 레디스에 큐 이름을 저장하는 이유는 서버가 죽었을때 consumer 복원을 해야하는 경우가 있을 것이라고 생각했기 때문이에요. ExchangeDeclare 시 파라미터로 주는 DurableAuto-Deletedtrue로 두면 서버 재시동시에도 복원이 가능하니까 레디스에 큐 이름을 저장할 필요가 없겠다는 말이었어요!

sypark9646 commented 3 years ago

그리고 redis 최대 커넥션 수는 repository.go 에 있는 initRedisPool 로 조절할 수 있어요! 현재는 제가 임의로 maxActive=100으로 설정해 두었어요

func InitRedisPool(address string) {
    redisPool = &redis.Pool{
        MaxIdle:   20,
        MaxActive: 100,
        Dial: func() (redis.Conn, error) {
            conn, err := redis.Dial("tcp", address)
            if err != nil {
                err = errors.Errorf("ERROR: fail init redis: %w", err)
                log.Fatal(err.Error())
                os.Exit(1)
            }
            return conn, err
        },
    }
}
sypark9646 commented 3 years ago

일단 여기까지 queue 연결 & consumer pool 완료했고 메세지 받는 것 까지 확인했습니다 메세지 파싱해서 레디스에 저장하는 건 일단 머지 후에 다음 pr에서 하는게 어떨까싶습니다..!!