작업을 작은 작업 단위로 나누어 병렬로 실행 가능, ForkJoinPool클래스를 사용하여 구현
Parallel() : 스트림 API와 함께 제공, 컬렉션의 요소를 병렬로 처리 가능
🍑 본론
parallel() 메서드
//메르센 소수를 찾는 프로그램
//메르센 소수 = 2^p-1 형태의 소수(p가 반드시 소수여야함, 안그러면 쉽게 인수분해)
public class MersennePrime {
private static final BigInteger ONE = BigInteger.valueOf(1);
private static final BigInteger TWO = BigInteger.valueOf(2);
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
//.parallel()
.filter(mersenne -> mersenne.isProbablePrime(50)) //Miller-Rabin 소수판별 알고리즘, 소수일 확률이 50% 이상의 확률일 경우에 소수로 판단
.limit(20) //이거 안쓰면 무한스트림이라 코드가 무한히 돌아감
.forEach(System.out::println);
}
//parallel()을 사용하지 않을 시 8.299초 소요
//사용 시 CPU 겁나 돌아가고 출력이 안됨, 응답불가상태
//parallel() 메서드로
static Stream<BigInteger> primes() {
//2부터 시작하여 소수를 생성하는 무한스트림
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
}
parallel() 사용 시, 성능이 개선되지 않는 이유
스트림 라이브러리가 위 파이프라인을 병렬화하는 방법을 찾아내지 못했기 때문에
데이터 소스가 Stream.iterate인 경우
이전 요소에 의존하여 다음 요소를 생성하므로 병렬처리가 어려움
즉, 순차적인 실행을 통해 요소처리가 되어야하는데, 병렬화는 소스의 요소가 독립적으로 처리될 수 있어야 강점을 발휘함.
중간 연산으로 limit를 사용할 경우
파이프라인 병렬화는 limit를 다룰 때 CPU 코어가 남는다면 몇 개 더 처리한 후 제한된 개수 이후의 결과를 버리는 식으로 동작
그런데, 메르센 소수를 찾는 위 코드는 새로운 메르센 소수를 찾을 때마다 그 비용이 이전까지의 원소 전부를 계산한 비용을 합친 것 만큼 듦
따라서, 병렬화시켜도 1~20번까지 순차적으로 찾는 비용보다
21~ 이후의 메르센 소수를 찾는 비용이 너무나도 커서 parallel()메서드를 사용하는데 더 오랜시간이 걸림
효율적인 병렬화를 위한 팁
스트림의 소스가 ArrayList, HashMap, HashSet, ConcurrentHashMap의 인스턴스거나 배열, int ~ long 범위일 때
정확하고 쉽게 나눌 수 있는 소스
위 자료구조들은 모두 데이터들을 원하는 크기로 정확하고 쉽게 나눌 수 있다.
따라서, 다수의 스레드에 분배하기 좋음.
분배 작업은 Spliterator(분할할 수 있는 반복자)가 담당하며, Stream이나 Iterable의 spliterator 메서드로 얻어올 수 있음.
참조 지역성이 뛰어난 소스
이웃한 원소의 참조들이 메모리에 연속해서 저장돼있기 때문에, 다량의 데이터를 처리하는 벌크 연산을 병렬화할 때 유리
참고로 참조 지역성이 가장 뛰어난 자료구조는 기본 타입의 배열(메모리에 연속적으로 저장됨)
reduce 메서드 중 하나, 혹은 min,max,count,sum같이 완성된 형태로 제공되는 메서드들을 이용할 때
anyMatch, allMatch, noneMatch처럼 조건에 맞으면 바로 반환되는 메서드 또한 병렬화에 적합
가변 축소를 수행하는 collect()메서드는 병렬화에 적합하지 않다. (컬렉션들을 합치는 부담이 크기 때문에)
병렬화 올바르게 사용하기
안전 실패(safety failure)
스트림을 잘못 병렬화할 경우, 성능이 나빠질 뿐만아니라 결과 자체가 잘못되거나 예상 못한 동작이 발생할 수 있다.
이를 안전 실패라 한다.
안전 실패는 병렬화한 파이프라인이 사용하는 mappers, filters 등 함수 객체가 명세대로 동작하지 않을 때 벌어질 수 있다.
Stream 명세는 함수 객체에 관한 엄중한 규약을 정의해놨다.
예를 들어, reduce연산에 건네지는 누적기와 결합기 함수는 반드시 결합 법칙을 만족하고, 간섭받지 않고, 상태를 갖지 않아야 한다.
위와 같은 요구사항을 지키지 못한 상태에 병렬로 파이프라인을 수행하면 실패로 이어지기 쉽다.
.reduce(0, (acc, x) -> acc + x); // 누적기 함수
.reduce(0, Integer::sum, Integer::sum); // 누적기+결합기 함수
병렬화를 사용할 가치가 있는지 판단하자.
위의 팁과 주의사항을 모두 지켰더라도, 파이프라인이 수행하는 진짜 작업이 병렬화에 드는 추가 비용을 상쇄하지 못한다면 성능 향상이 미미할 수 있다.
실제로 성능이 향상될 지 추정해보는 간단한 방법 : 스트림 안의 원소 수와 원소당 수행되는 코드 줄 수를 곱하여, 최소 수십만이 되어야 한다.
스트림 병렬화는 오직 성능 최적화 수단이므로, 성능을 테스트하여 병렬화를 사용할 가치가 있는지 판단해야 한다.
//병렬화를 효과적으로 사용한 예제
//소수 계산 스트림 파이프 라인
public class ParallelPrime {
//n이하의 소수의 개수를 세는 메서드
static long pi(long n) {
return LongStream.rangeClosed(2, n)
// .parallel() //parallel()을 사용할 때 0.6초, 사용하지 않을 때 1.8초, 즉 3배 이상의 성능 향상,
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count(); //reduction 메서드
}
public static void main(String[] args) {
long before = System.currentTimeMillis();
long n = (long) Math.pow(10, 6);
long result = pi(n);
long after = System.currentTimeMillis();
System.out.println(n + "이하의 소수의 개수 : "+ result);
System.out.println("소요시간(ms) :" + (after-before));
}
}
SplittableRandom은 레이스 컨디션을 피하기 위해 잘 분할된 난수 생성기를 제공하여, 병렬화에 적합하다.
Random을 사용할 경우 병렬화를 해선 안된다. 모든 연산을 동기화하기 때문에 최악의 성능으로 이어질 수 있다.
//SplittableRandom 사용시
public static void main(String[] args) {
long before = System.currentTimeMillis();
SplittableRandom random = new SplittableRandom();
// 랜덤한 정수 10억개를 생성하는 스트림
long randomNumberCount = random.ints(1000000000)
// .parallel() // 병렬화시 0.111초, 안하면 0.465초
.count();
// 결과 출력
System.out.println("총 생성된 랜덤 숫자 개수: " + randomNumberCount);
long after = System.currentTimeMillis();
System.out.println("소요시간(ms) :" + (after-before));
}
//ThreadLocalRandom 사용시
public static void main(String[] args) {
long before = System.currentTimeMillis();
// 랜덤한 정수 10억개를 생성하는 스트림
long randomNumberCount = ThreadLocalRandom.current().ints(10000000)
// .parallel() // 위와 비슷한 결과... 이상함
.count();
// 결과 출력
System.out.println("총 생성된 랜덤 숫자 개수: " + randomNumberCount);
long after = System.currentTimeMillis();
System.out.println("소요시간(ms) :" + (after-before));
}
//Random 사용시
public static void main(String[] args) {
long before = System.currentTimeMillis();
Random random = new Random();
// 10억개의 난수를 생성하는 병렬 스트림 생성
long randomNumberCount = random.ints(1000000000)
// .parallel() // 병렬화시 38초, 안하면 3초
.count();
// 결과 출력
System.out.println("총 생성된 랜덤 숫자 개수: " + randomNumberCount);
long after = System.currentTimeMillis();
System.out.println("소요시간(ms) :" + (after-before));
}
퀴즈
다음 코드는 병렬화하는게 적합한가?
public class test {
public static void main(String[] args) {
long before = System.currentTimeMillis();
// 1부터 1000만까지의 정수를 생성하여 리스트로 수집
List<Integer> numbers = IntStream.rangeClosed(1, 10000000)
// .parallel() // 병렬화
.boxed()
.collect(Collectors.toList());
long after = System.currentTimeMillis();
// 결과 출력
// System.out.println(numbers);
System.out.println("소요시간(ms) :" + (after-before));
}
}
다음 코드는 병렬화하면 속도가 빨라질까?
public class test2 {
public static void main(String[] args) {
long before = System.currentTimeMillis();
// 무한 스트림으로 처음 50만개의 요소 출력
Stream.iterate(0, n -> n + 1)
// .parallel() // 병렬화
.limit(500000)
.forEach(System.out::println);
long after = System.currentTimeMillis();
// 결과 출력
System.out.println("소요시간(ms) :" + (after-before));
}
}
Chapter : 7. 람다와 스트림
Item : 48. 스트림 병렬화는 주의해서 사용하라
Assignee : jseok0917
🍑 서론
자바의 동시성 프로그래밍
wait 및 notify
java.util.concurrent : 동시성을 처리하기 위한 다양한 클래스와 인터페이스 포함
포크-조인 패키지 : 병렬 처리를 위한 프레임워크
Parallel() : 스트림 API와 함께 제공, 컬렉션의 요소를 병렬로 처리 가능
🍑 본론
parallel() 메서드
parallel() 사용 시, 성능이 개선되지 않는 이유
스트림 라이브러리가 위 파이프라인을 병렬화하는 방법을 찾아내지 못했기 때문에
데이터 소스가 Stream.iterate인 경우
중간 연산으로 limit를 사용할 경우
효율적인 병렬화를 위한 팁
스트림의 소스가 ArrayList, HashMap, HashSet, ConcurrentHashMap의 인스턴스거나 배열, int ~ long 범위일 때
정확하고 쉽게 나눌 수 있는 소스
참조 지역성이 뛰어난 소스
종단 연산의 동작 방식을 고려하라
종단 연산 : 스트림 파이프라인에서 최종결과를 생성하거나 반환하는 연산
종단 연산 중 병렬화에 가장 적합한 것은 축소(reduction)
병렬화 올바르게 사용하기
안전 실패(safety failure)
스트림을 잘못 병렬화할 경우, 성능이 나빠질 뿐만아니라 결과 자체가 잘못되거나 예상 못한 동작이 발생할 수 있다.
이를 안전 실패라 한다.
안전 실패는 병렬화한 파이프라인이 사용하는 mappers, filters 등 함수 객체가 명세대로 동작하지 않을 때 벌어질 수 있다.
Stream 명세는 함수 객체에 관한 엄중한 규약을 정의해놨다.
퀴즈
🍑 결론
무작정 병렬화를 한다고 속도가 빨라질 것이라 생각하지 말자.
병렬화 시 오동작 등의 부작용도 항상 고려해야 한다.
referenced by