Parallel Stream — khi nào thực sự nhanh hơn, khi nào nguy hiểm
ForkJoinPool bên dưới parallelStream, 4 pitfall stateful/ordering/blocking/race condition, formula Doug Lea NxQ, và khi nào nên dùng CompletableFuture thay thế.
TL;DR: parallelStream() dùng ForkJoinPool.commonPool() — pool shared toàn JVM — để chia dữ liệu thành chunk và xử lý song song. Nhưng "thêm .parallel() là nhanh gấp N lần CPU" là ngộ nhận. 4 điều kiện phải hội đủ: dataset lớn (công thức Doug Lea N×Q > 10000), CPU-bound không phải I/O-bound, không shared state, source splittable hiệu quả. Vi phạm bất kỳ điều kiện nào đều cho kết quả tệ hơn sequential hoặc gây race condition im lặng. Bài này phân tích từng pitfall đến tận cơ chế ForkJoin, và khi nào dùng CompletableFuture với executor riêng thay vì parallel stream.
1. Scenario — thêm .parallel() mà chậm hơn
Một developer thấy đoạn xử lý 1000 record mất 200ms. Đọc được "parallel stream dùng nhiều thread", anh ấy thêm vào:
// TRUOC: 200ms
List<Report> reports = orders.stream()
.map(this::generateReport)
.toList();
// SAU khi them parallel(): mong doi nhanh hon, thuc te 350ms
List<Report> reports = orders.parallelStream()
.map(this::generateReport)
.toList();
Performance tệ hơn 75%. Trước khi hiểu vì sao, cần biết bên dưới parallelStream() làm gì.
2. parallelStream() và ForkJoin — cơ chế bên dưới
parallelStream() (và stream().parallel()) chuyển stream thành parallel stream — Java chia dữ liệu nguồn thành nhiều chunk qua cơ chế Spliterator, giao mỗi chunk cho một worker thread trong ForkJoinPool.commonPool(), rồi gộp kết quả qua combiner.
ForkJoinPool (java.util.concurrent, Java 7+) là thread pool đặc biệt dùng thuật toán work-stealing — thread nào rảnh có thể "ăn cắp" task từ hàng chờ của thread khác. ForkJoinPool.commonPool() là singleton shared toàn JVM: mặc định có Runtime.getRuntime().availableProcessors() - 1 thread (trừ 1 vì main thread cũng tính).
// Kiem tra kich thuoc commonPool
System.out.println(ForkJoinPool.commonPool().getParallelism());
// 7 tren may 8 core (8 - 1 = 7)
Luồng xử lý parallel stream:
flowchart TD
A[parallelStream] --> B[Spliterator chia data\nthành N chunk]
B --> C1[Chunk 1\nThread-1]
B --> C2[Chunk 2\nThread-2]
B --> C3[Chunk 3\nThread-3]
B --> C4[Chunk N\nThread-N]
C1 --> D[Combiner gộp kết quả]
C2 --> D
C3 --> D
C4 --> D
D --> E[Result]
F[ForkJoinPool.commonPool\nSingleton JVM-wide] -.-> C1
F -.-> C2
F -.-> C3
F -.-> C4Chi phí overhead của mỗi parallel pipeline:
- Chia data thành chunk (Spliterator).
- Fork task vào pool queue.
- Context switch giữa các thread.
- Work-stealing coordination.
- Combine kết quả từ các thread.
Tổng overhead này khoảng 100-200 microsecond theo benchmark Aleksey Shipilev (JIT engineer Oracle). Với dataset nhỏ hoặc operation nhẹ, overhead lớn hơn "công sức" thực → parallel chậm hơn sequential.
3. Công thức Doug Lea — N×Q > 10000
Doug Lea (tác giả ForkJoinPool, Professor Emeritus SUNY Oswego) đề xuất quy tắc thực nghiệm:
Parallel stream đáng khi
N × Q > 10000
Trong đó:
- N — số phần tử trong stream.
- Q — cost xử lý mỗi phần tử, đơn vị "đơn vị tính toán" (operation count, không phải nanosecond). Phép cộng, so sánh, array lookup ~ Q=1. Regex match ~ Q=100. Hash computation ~ Q=50. Network call ~ Q cao nhưng I/O-bound nên không nên parallel.
Ví dụ:
- N=1000, Q=5 → N×Q=5000 → dưới ngưỡng → sequential tốt hơn.
- N=100000, Q=10 → N×Q=1.000.000 → parallel có lợi rõ.
- N=1000, Q=20 → N×Q=20000 → ranh giới, đo thực tế bằng JMH.
Quay lại scenario: generateReport với 1000 record — nếu generateReport là lightweight operation (Q nhỏ), N×Q dưới ngưỡng → overhead parallel thắng → 350ms tệ hơn 200ms.
Trong code production, ngưỡng 10000 là "số liệu tham khảo, không phải guarantee". Data structure, cache behavior, GC pressure đều ảnh hưởng. Luôn đo bằng JMH (Java Microbenchmark Harness) với data production-like trước khi quyết định. Trực giác sai thường xuyên.
4. Pitfall 1 — stateful lambda và race condition
Stateful lambda là lambda đọc hoặc ghi vào biến ngoài scope của nó. Trong parallel stream, nhiều thread chạy lambda đồng thời — nếu biến ngoài không thread-safe, race condition xảy ra.
// SAI -- race condition tren shared counter
int[] count = {0}; // effectively final array trick
orders.parallelStream()
.filter(o -> o.amount() > 100)
.forEach(o -> {
count[0]++; // RACE CONDITION -- nhieu thread ghi dong thoi
});
System.out.println(count[0]); // ket qua khong deterministic moi lan chay
Vì sao nguy hiểm: count[0]++ là read-modify-write — 3 bước không atomic. Thread A đọc count=5, Thread B đọc count=5, Thread A ghi 6, Thread B ghi 6 (lẽ ra phải 7). Kết quả nhỏ hơn thực tế và thay đổi mỗi lần chạy.
Workaround 1 — AtomicInteger cho counter đơn giản:
AtomicInteger count = new AtomicInteger(0);
orders.parallelStream()
.filter(o -> o.amount() > 100)
.forEach(o -> count.incrementAndGet()); // atomic, thread-safe
System.out.println(count.get()); // chinh xac
Workaround 2 — Redesign thành reduce (idiom tốt hơn):
// Dung count() terminal op -- khong can shared state gi ca
long count = orders.parallelStream()
.filter(o -> o.amount() > 100)
.count();
Hoặc với custom aggregation:
// Dung reduce de tinh tong -- pure function, khong shared state
int totalAmount = orders.parallelStream()
.filter(o -> o.amount() > 100)
.mapToInt(Order::amount)
.sum(); // sum() la reduce associative, parallel-safe
ArrayList không thread-safe. Gọi list.add(element) từ nhiều thread đồng thời có thể gây: kết quả thiếu phần tử, ArrayIndexOutOfBoundsException khi resize, hoặc null xuất hiện ở giữa list. Không có exception rõ ràng — bug im lặng. Pattern đúng: dùng collect(Collectors.toList()) hoặc collect(Collectors.toConcurrentMap(...)) — collector đảm bảo thread-safety trong accumulate.
5. Pitfall 2 — forEach không guarantee order
Với sequential stream, forEach emit phần tử theo thứ tự source. Với parallel stream, thứ tự phụ thuộc thread scheduling — không deterministic.
List<Integer> nums = List.of(1, 2, 3, 4, 5);
// Sequential: in theo thu tu 1 2 3 4 5
nums.stream().forEach(System.out::println);
// Parallel: thu tu khong bao gio biet truoc
nums.parallelStream().forEach(System.out::println);
// Co the in: 3 1 4 2 5 hoac 2 4 1 5 3 hoac bat ky thu tu nao
Nếu cần guarantee thứ tự với parallel stream, dùng forEachOrdered:
nums.parallelStream().forEachOrdered(System.out::println);
// In dung thu tu: 1 2 3 4 5
Nhưng forEachOrdered phải serialize kết quả trước khi emit — nó hủy phần lớn lợi ích parallel. Parallel với forEachOrdered thường chậm hơn sequential vì phải sync và đợi thứ tự.
Rule thực tế: nếu cần thứ tự kết quả, dùng collect() thay vì forEachOrdered. collect(Collectors.toList()) với parallel stream gom theo thứ tự encounter order của stream source.
6. Pitfall 3 — ForkJoinPool.commonPool() shared và blocking I/O
Đây là pitfall nguy hiểm nhất ở production. ForkJoinPool.commonPool() là singleton — tất cả parallel stream trong toàn JVM dùng chung. Nếu một parallel stream thực hiện I/O blocking (HTTP call, JDBC query, file read), thread bị block và không thể xử lý task khác.
// BAD -- parallel stream goi HTTP cho tung URL
List<String> responses = urls.parallelStream()
.map(url -> httpGet(url)) // block 200-500ms moi call
.toList();
// Hau qua: 7 thread cua commonPool bi block vao HTTP
// Cac parallel stream khac trong app (va CompletableFuture.supplyAsync khong co executor)
// khong con thread de chay -- toan bo JVM bi cham
Scenario tệ hơn: ứng dụng có 2 endpoint. Endpoint A xử lý 50 URL song song dùng parallelStream(). Endpoint B dùng CompletableFuture.supplyAsync() (không truyền executor, default dùng commonPool). Khi A đang chạy, B đợi hoàn toàn vì pool hết thread.
Giải pháp: dùng ForkJoinPool riêng hoặc CompletableFuture với executor tường minh:
// Option 1: ForkJoinPool tuy chinh
ForkJoinPool customPool = new ForkJoinPool(4); // pool rieng 4 thread
try {
List<String> responses = customPool.submit(() ->
urls.parallelStream()
.map(url -> httpGet(url))
.toList()
).get();
} finally {
customPool.shutdown();
}
// Option 2 (idiomatic hon): CompletableFuture voi executor rieng
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> httpGet(url), executor))
.toList();
List<String> responses = futures.stream()
.map(CompletableFuture::join)
.toList();
executor.shutdown();
Option 2 dùng CompletableFuture với executor riêng là pattern idiomatic cho I/O concurrency trong Java — không chiếm commonPool.
Project Loom (Java 21) giới thiệu Virtual Threads — lightweight thread chạy được hàng nghìn đồng thời mà không block OS thread khi I/O. Tuy nhiên parallel stream chưa tích hợp Virtual Threads tính đến Java 21. parallelStream() vẫn dùng ForkJoinPool với platform threads. Để tận dụng Virtual Threads cho I/O concurrency, dùng Executors.newVirtualThreadPerTaskExecutor() làm executor cho CompletableFuture — không dùng parallel stream.
7. Pitfall 4 — source splittable kém
Parallel stream chia data qua Spliterator.trySplit(). Hiệu quả chia phụ thuộc vào data source:
| Source | Splittable | Lý do |
|---|---|---|
ArrayList, int[], double[] | Tốt — O(1) | Array: biết size, split ở index giữa tức thì |
IntStream.range(0, N) | Tốt — O(1) | Biết range, split bằng toán học |
HashMap.values() | Tốt | Bucket-based split |
LinkedList | Kém — O(n) | Phải traverse từ head để tìm midpoint |
Files.lines(path) | Kém | Không biết tổng line count trước |
Stream.iterate(seed, fn) | Kém — không split được | Generator sequential, không có midpoint |
Stream.generate(supplier) | Kém | Tương tự iterate |
Khi source split kém, parallel stream chia không đều — một thread làm phần lớn công việc, các thread khác xong sớm rồi ngồi chờ. Overhead điều phối lớn hơn lợi ích song song.
// KHONG TOI UU -- LinkedList split kem
List<Order> orders = new LinkedList<>(rawOrders);
long count = orders.parallelStream()
.filter(o -> o.amount() > 100)
.count();
// TOT HON -- ArrayList split hieu qua
List<Order> orders = new ArrayList<>(rawOrders);
long count = orders.parallelStream()
.filter(o -> o.amount() > 100)
.count();
8. Khi nào parallel thực sự có lợi — checklist
Parallel stream có lợi chỉ khi hội đủ tất cả điều kiện sau:
| Điều kiện | Vì sao cần |
|---|---|
| N×Q vượt 10000 | Overhead fork/join < thời gian tính toán thực |
| CPU-bound, không I/O | I/O-bound block thread pool, hại toàn JVM |
| Không shared mutable state | Race condition, lock contention |
| Operation associative | Reduce/combine cho kết quả đúng bất kể thứ tự |
| Source splittable hiệu quả | Chia đều cho các thread, không lệch |
| commonPool không bận | Không tranh tài nguyên với task khác trong JVM |
Thực tế enterprise, workload thoả hết 6 điều này khá hiếm. Đa số business logic liên quan DB, HTTP, hoặc state shared. Trong những trường hợp đó, sequential stream hoặc CompletableFuture với executor riêng là lựa chọn tốt hơn.
9. Ví dụ: khi parallel thực sự thắng
Bài toán tính số nguyên tố trong 10 triệu số — CPU-bound, stateless, array-backed source:
// Sequential
long seqCount = LongStream.range(2, 10_000_000)
.filter(PrimeChecker::isPrime)
.count();
// Parallel -- hop le: N=10M, Q cao (kiem tra so nguyen to),
// source LongStream.range split O(1), khong shared state, CPU-bound
long parCount = LongStream.range(2, 10_000_000)
.parallel()
.filter(PrimeChecker::isPrime)
.count();
Trên máy 8 core, parallel cho speedup xấp xỉ 5-7 lần (không đến 8 lần vì overhead và main thread cũng tính).
// Benchmark mental model:
// isPrime(n): kiem tra chia het cho 2 den sqrt(n) -- O(sqrt(n)) phep tinh
// Voi n ~ 10^7, sqrt(n) ~ 3162 phep tinh -> Q ~ 3000
// N = 10.000.000, Q ~ 3000 -> N*Q ~ 3*10^10 >> 10000 -> parallel rat co loi
10. So sánh parallel stream vs CompletableFuture
| Tiêu chí | parallelStream() | CompletableFuture + executor |
|---|---|---|
| Thread pool | commonPool (shared, fixed size) | Tự chọn executor (riêng biệt) |
| I/O-bound | Nguy hiểm — block commonPool | An toàn — pool riêng không ảnh hưởng JVM |
| CPU-bound | Tốt khi N×Q > 10000 | Overhead cao hơn cho CPU-bound đơn giản |
| Composition | Không — chỉ pipeline tuyến tính | Có — thenCompose, allOf, anyOf |
| Timeout per task | Không có | Có — orTimeout, completeOnTimeout |
| Error handling | Exception propagate ra terminal op | exceptionally, handle per-future |
| API complexity | Đơn giản — thêm .parallel() | Phức tạp hơn — cần explicit future chain |
| Java virtual threads | Không tích hợp (Java 21) | Có — dùng newVirtualThreadPerTaskExecutor |
Quy tắc chọn:
- CPU-bound, stateless, array-backed, không cần composition →
parallelStream(). - I/O-bound, cần timeout, cần error handling per-task, cần Virtual Threads →
CompletableFuture+ executor.
11. Deep Dive
- "When to Use Parallel Streams" — Doug Lea — web.archive.org/web/20160331125154/http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html — bài gốc của tác giả
ForkJoinPool, giải thích công thức N×Q, phân tích chi tiết khi nào parallel thắng. Bắt buộc đọc trước khi dùng parallel stream ở production. ForkJoinPoolJavadoc SE 21 — docs.oracle.com/.../ForkJoinPool.html — giải thích work-stealing,commonPool(), cấu hìnhparallelism, và hành vi khi task blocking.SpliteratorJavadoc SE 21 — docs.oracle.com/.../Spliterator.html — cơ chế chia data cho parallel, cácCharacteristics(SIZED, ORDERED, SUBSIZED...) ảnh hưởng đến hiệu quả split.- JEP 446 — Scoped Values (Java 21) — openjdk.org/jeps/446 — cơ chế chia sẻ dữ liệu read-only an toàn giữa thread (thay thế
ThreadLocaltrong concurrent context), liên quan khi parallel stream cần chia config/context qua thread. - Project Loom — Virtual Threads (JEP 444, Java 21) — openjdk.org/jeps/444 — giải thích vì sao parallel stream chưa tích hợp Virtual Threads, và hướng đúng cho I/O concurrency trong Java 21+.
12. Self-check
Q1Dev thêm `.parallel()` vào stream xử lý 500 record, mỗi record gọi 1 hàm tính toán đơn giản (tổng vài phép nhân). Performance tệ hơn 40%. Giải thích nguyên nhân và quyết định dùng sequential hay parallel.▸
Q2Đoạn code sau có bug không? Nếu có, bug gì, và sửa thế nào? List<String> results = new ArrayList<>(); orders.parallelStream().map(Order::id).forEach(results::add);▸
List<String> results = new ArrayList<>(); orders.parallelStream().map(Order::id).forEach(results::add);List<String> results = orders.parallelStream().map(Order::id).collect(Collectors.toList()); Collector đảm bảo mỗi thread có container riêng, combiner gộp thread-safe cuối cùng.Q3`ForkJoinPool.commonPool()` có bao nhiêu thread? Vì sao đây là nguy hiểm với I/O-bound parallel stream?▸
Q4`parallelStream().forEachOrdered(consumer)` có đảm bảo thứ tự không? Tại sao performance tệ hơn sequential?▸
Q5Vì sao `LinkedList` là nguồn kém cho parallel stream, còn `ArrayList` lại tốt?▸
(left + right) / 2. `LinkedList` là doubly-linked list — để tìm phần tử ở giữa phải traverse từ head O(n/2). Với N=1 triệu element, split `LinkedList` tốn 500000 bước — bản thân split đã đắt hơn nhiều công việc thực. Thêm nữa, split không đều vì overhead — một số chunk có nhiều element hơn, thread không cân bằng. Hậu quả: parallel stream trên `LinkedList` thường chậm hơn sequential. Nếu đang dùng `LinkedList`, convert sang `ArrayList` trước khi parallel: new ArrayList<>(linkedList).parallelStream().Q6Cho đoạn code: orders.parallelStream().filter(o -> o.amount() > 100).sorted().toList() — `sorted()` trong pipeline parallel ảnh hưởng performance thế nào?▸
orders.parallelStream().filter(o -> o.amount() > 100).sorted().toList() — `sorted()` trong pipeline parallel ảnh hưởng performance thế nào?Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên