Open Yg-Hong opened 1 year ago
Queue
를 사용해 오면서 구현체를 만들때 new
를 치면 자동 완성 되는 수많은 구현체들을 보면서도, 아무 관심도 주지 않고 내가 쓰는 구현체만 써 왔었다.
해당 책을 읽으며, 그리고 이 이슈를 다루며 Queue
에는 어떤 구현체가 있고 어디에 사용되는 지를 알아 보게 되어 기쁘다 .?
Queue에 대한 개념은 책에서 다루고 있으므로 생략하고 바로 Blocking Queue
로 넘어가겠다.
먼저 BlockingQueue
는 java.util.concurrent
패키지에 포함된, 구현체가 아닌 인터페이스다.
Blocking Queue
는 동시성 프로그래밍에서 사용되는 스레드 안전한 큐이다.
큐의 기본 작업에 블로킹 연산
을 추가하여, 큐가 가득 찼을 때나 항목을 추가하려는 스레드나,
큐가 비었을 때 항목을 제거하려는 스레드를 대기 상태로 만든다.
블로킹 연산이란 ?
특정 조건이 충족될 떄까지 스레드를 일시 중지시키는 연산으로, 연산이 완료될 때까지 스레드를 대기 상태로 만든다.
스레드 안전:
BlockingQueue
는 내부적으로 동기화 되어 있어 여러 스레드에서 동시에 접근해도 안전하다.LinkedList
나, PriorityQueue
의 경우 여러 스레드에서 동시에 접근하면 데이터 일관성이 깨질 수 있다.블로킹 연산:
put
연산과 큐가 비었을 때의 take
연산이 블로킹된다. 이러한 연산들은 특정 조건이 충족될 때까지 스레드를 대기(Block)시킨다.
시간 제한 있는 연산:offer(E e, long timeout, TimeUnit unit)
및 poll(long timeout, TimeUnit unit)
로 대기 시간을 설정할 수 있다. 지정된 시간 내에 연산이 완료되지 않으면 타임아웃과 함께 실패한다.ArrayBlockingQueue
: 고정 크기의 배열을 기반으로 한 구현. 크기가 일단 설정되면 변경할 수 없고, 구현 시 크기를 지정해 주어야 한다.
LinkedBlockingQueue
: 연결 노드를 기반으로 한 구현. 선택적으로 최대 크기를 설정할 수 있다.
PriorityBlockingQueue
: 요소를 우선순위에 따라 저장하는 구현.
SynchronousQueue
: 단 하나의 항목만 저장할 수 있는 블로킹 큐. 이 큐에 항목을 넣으면 다른 스레드가 그 항목을 꺼낼 때까지 현재 스레드는 대기(블록)한다.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5); // 큐의 최대 크기는 5
// 프로듀서 스레드
Thread producer = new Thread(() -> {
int value = 0;
while (true) {
try {
queue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000); // 1초에 한 번 PUT
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 컨슈머 스레드
Thread consumer = new Thread(() -> {
while (true) {
try {
int value = queue.take();
System.out.println("Consumed " + value);
Thread.sleep(1500); // 1.5초에 한 번 TAKE
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
위 코드를 보자.
Producer 스레드는 1초당 1회 값을 넣고. Consumer 스레듣 1.5초당 1회 값을 소비한다.
BlockingQueue
의 크기를 5로 설정 하고 실행한다.
크기가 5가 다 찰 때까지는 우리가 아는 일반 Queue
와 똑같이 실행 될 것이다.
크기가 다 찬 순간부터는 앞서 본 개념대로라면, Consumer
가 실행되어야만 Producer
가 실행 될 것이다.
... 생략 ...
Consumed 4
Produced 6
Produced 7
Consumed 5
Produced 8
Consumed 6
Produced 9 // 아직 5까지 안 차서
Produced 10 // Produced가 두 번 실행 된다.
... 중략 ...
Consumed 9 // 일정 시간 이후부터는 size = 5
Produced 14 // Consumed 해야만 Produced 된다.
Consumed 10
Produced 15
Consumed 11
Produced 16
Consumed 12
Produced 17
Consumed 13
Produced 18
앞선 개념처럼 사이즈가 다 찰 때 까지, 대기 없이 동작하다가 사이즈가 가득 찬 시점 이후부터는 Consumed
가 실행 되어야 Produced
가 실행 되는 것을 알 수 있다.
DelayQueue
는java.util.concurrent
패키지에 포함된 동시성 유틸리티 중 하나로 BlockingQueue
의 구현체이다.
요소가 지정된 지연 시간이 지날 때까지 가져올 수 없으며,
이는 스케쥴링 또는 재시도와 같은 연산에서 유용하게 사용될 수 있다.
DelayQueue
의 요소는 Delayed
인터페이스를 구현해야 한다. Delayed
인터페이스를 구현하지 않았다면 컴파일 에러가 발생한다.
Delayed?
DelayQueue에서 얼마나 Delay 될 지는 Delayed 인터페이스의 getDelay() 메서드를 통해서 제공된다.
따라서, Delayed 인터페이스를 구현하지 않으면 컴파일 에러가 발생한다.
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
static class DelayedTask implements Delayed {
private long delayUntil;
private String taskName;
public DelayedTask(String taskName, long delayInMillis) {
this.taskName = taskName;
this.delayUntil = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayUntil - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.delayUntil, ((DelayedTask) o).delayUntil);
}
@Override
public String toString() {
return taskName;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.add(new DelayedTask("Task1", 5000)); // 5초 후 실행
delayQueue.add(new DelayedTask("Task2", 2000)); // 2초 후 실행
long startTime = System.currentTimeMillis();
System.out.println("Start: " + startTime);
while (!delayQueue.isEmpty()) {
DelayedTask task = delayQueue.take(); // 지연이 만료될 때까지 블로킹
System.out.println("Executed " + task + " at " + (System.currentTimeMillis() - startTime));
}
}
}
Start: 1694526619763
Executed Task2 at 2008 // 실행 2초 후 발생
Executed Task1 at 5003 // 실해 5초 후 발생
DelayQueue
는 지연시간에 따라 자동 정렬되기 때문에, 2초의 지연 시간을 갖는 Task 2가 먼저 실행이 되었다.
이 후 3초 후에 Task1이 실행된 것을 확인하였다.
또한, DelayQueue
는 BlockingQueue
의 구현체이므로, 아래처럼 바꾸어 주어도 무방하다.
Java에서 Colletion
계열은 인터페이스로 선언을 해 주는 것이 유지보수 관점에서 좋다니,
아래 방법을 더 권장한다.
public static void main(String[] args) throws InterruptedException {
// DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
BlockingQueue<DelayedTask> blockingQueue = new DelayQueue<>();
blockingQueue.add(new DelayedTask("Task1", 5000)); // 5초 후 실행
blockingQueue.add(new DelayedTask("Task2", 2000)); // 2초 후 실행
long startTime = System.currentTimeMillis();
System.out.println("Start: " + startTime);
while (!blockingQueue.isEmpty()) {
DelayedTask task = blockingQueue.take(); // 지연이 만료될 때까지 블로킹
System.out.println("Executed " + task + " at " + (System.currentTimeMillis() - startTime));
}
}
👍 문제
P.63에서는 Queue 인터페이스의 구현체에 대해서 서술하고 있다. 이중에 익숙하지 않은 이름들이 몇가지 보인다.
Blocking Queue
와Delay Queue
... 이 두가지의 구체적인 내부 구현 형태를 코드로 정리하고 개념을 정리해보자.✈️ 선정 배경
이벤트 기반 아키텍처를 공부하면서 Queue의 특성과 활용방안에 대해 관심이 많아졌다. 이번 기회에 알아보자.
📺 관련 챕터 및 레퍼런스
Chap. 04_어디에 담아야 하는지 (p63)
🐳 비고
정리를 코드 위주로 해보자.