Parallel Stream — khi nào thực sự nhanh hơn, khi nào nguy hiểm
ForkJoinPool bên dưới parallelStream, 4 pitfall stateful/ordering/blocking/race condition, formula Doug Lea NxQ, và khi nào nên dùng CompletableFuture thay thế.
TL;DR: parallelStream() dùng ForkJoinPool.commonPool() — pool shared toàn JVM — để chia dữ liệu thành chunk và xử lý song song. Nhưng "thêm .parallel() là nhanh gấp N lần CPU" là ngộ nhận. 4 điều kiện phải hội đủ: dataset lớn (công thức Doug Lea N×Q > 10000), CPU-bound không phải I/O-bound, không shared state, source splittable hiệu quả. Vi phạm bất kỳ điều kiện nào đều cho kết quả tệ hơn sequential hoặc gây race condition im lặng. Bài này phân tích từng pitfall đến tận cơ chế ForkJoin, và khi nào dùng CompletableFuture với executor riêng thay vì parallel stream.
1. Scenario — thêm .parallel() mà chậm hơn
Một developer thấy đoạn xử lý 1000 record mất 200ms. Đọc được "parallel stream dùng nhiều thread", anh ấy thêm vào:
// TRUOC: 200ms
List<Report> reports = orders.stream()
.map(this::generateReport)
.toList();
// SAU khi them parallel(): mong doi nhanh hon, thuc te 350ms
List<Report> reports = orders.parallelStream()
.map(this::generateReport)
.toList();
Performance tệ hơn 75%. Trước khi hiểu vì sao, cần biết bên dưới parallelStream() làm gì.
2. parallelStream() và ForkJoin — cơ chế bên dưới
parallelStream() (và stream().parallel()) chuyển stream thành parallel stream — Java chia dữ liệu nguồn thành nhiều chunk qua cơ chế Spliterator, giao mỗi chunk cho một worker thread trong ForkJoinPool.commonPool(), rồi gộp kết quả qua combiner.
ForkJoinPool (java.util.concurrent, Java 7+) là thread pool đặc biệt dùng thuật toán work-stealing — thread nào rảnh có thể "ăn cắp" task từ hàng chờ của thread khác. ForkJoinPool.commonPool() là singleton shared toàn JVM: mặc định có Runtime.getRuntime().availableProcessors() - 1 thread (trừ 1 vì main thread cũng tính).
// Kiem tra kich thuoc commonPool
System.out.println(ForkJoinPool.commonPool().getParallelism());
// 7 tren may 8 core (8 - 1 = 7)
Luồng xử lý parallel stream:
flowchart TD
A[parallelStream] --> B[Spliterator chia data\nthành N chunk]
B --> C1[Chunk 1\nThread-1]
B --> C2[Chunk 2\nThread-2]
B --> C3[Chunk 3\nThread-3]
B --> C4[Chunk N\nThread-N]
C1 --> D[Combiner gộp kết quả]
C2 --> D
C3 --> D
C4 --> D
D --> E[Result]
F[ForkJoinPool.commonPool\nSingleton JVM-wide] -.-> C1
F -.-> C2
F -.-> C3
F -.-> C4Chi phí overhead của mỗi parallel pipeline:
- Chia data thành chunk (Spliterator).
- Fork task vào pool queue.
- Context switch giữa các thread.
- Work-stealing coordination.
- Combine kết quả từ các thread.
Tổng overhead này khoảng 100-200 microsecond theo benchmark Aleksey Shipilev (JIT engineer Oracle). Với dataset nhỏ hoặc operation nhẹ, overhead lớn hơn "công sức" thực → parallel chậm hơn sequential.
3. Công thức Doug Lea — N×Q > 10000
Doug Lea (tác giả ForkJoinPool, Professor Emeritus SUNY Oswego) đề xuất quy tắc thực nghiệm:
Parallel stream đáng khi
N × Q > 10000
Trong đó:
- N — số phần tử trong stream.
- Q — cost xử lý mỗi phần tử, đơn vị "đơn vị tính toán" (operation count, không phải nanosecond). Phép cộng, so sánh, array lookup ~ Q=1. Regex match ~ Q=100. Hash computation ~ Q=50. Network call ~ Q cao nhưng I/O-bound nên không nên parallel.
Ví dụ:
- N=1000, Q=5 → N×Q=5000 → dưới ngưỡng → sequential tốt hơn.
- N=100000, Q=10 → N×Q=1.000.000 → parallel có lợi rõ.
- N=1000, Q=20 → N×Q=20000 → ranh giới, đo thực tế bằng JMH.
Quay lại scenario: generateReport với 1000 record — nếu generateReport là lightweight operation (Q nhỏ), N×Q dưới ngưỡng → overhead parallel thắng → 350ms tệ hơn 200ms.
Trong code production, ngưỡng 10000 là "số liệu tham khảo, không phải guarantee". Data structure, cache behavior, GC pressure đều ảnh hưởng. Luôn đo bằng JMH (Java Microbenchmark Harness) với data production-like trước khi quyết định. Trực giác sai thường xuyên.
4. Pitfall 1 — stateful lambda và race condition
Stateful lambda là lambda đọc hoặc ghi vào biến ngoài scope của nó. Trong parallel stream, nhiều thread chạy lambda đồng thời — nếu biến ngoài không thread-safe, race condition xảy ra.
// SAI -- race condition tren shared counter
int[] count = {0}; // effectively final array trick
orders.parallelStream()
.filter(o -> o.amount() > 100)
.forEach(o -> {
count[0]++; // RACE CONDITION -- nhieu thread ghi dong thoi
});
System.out.println(count[0]); // ket qua khong deterministic moi lan chay
Vì sao nguy hiểm: count[0]++ là read-modify-write — 3 bước không atomic. Thread A đọc count=5, Thread B đọc count=5, Thread A ghi 6, Thread B ghi 6 (lẽ ra phải 7). Kết quả nhỏ hơn thực tế và thay đổi mỗi lần chạy.
Workaround 1 — AtomicInteger cho counter đơn giản:
AtomicInteger count = new AtomicInteger(0);
orders.parallelStream()
.filter(o -> o.amount() > 100)
.forEach(o -> count.incrementAndGet()); // atomic, thread-safe
System.out.println(count.get()); // chinh xac
Workaround 2 — Redesign thành reduce (idiom tốt hơn):
// Dung count() terminal op -- khong can shared state gi ca
long count = orders.parallelStream()
.filter(o -> o.amount() > 100)
.count();
Hoặc với custom aggregation:
// Dung reduce de tinh tong -- pure function, khong shared state
int totalAmount = orders.parallelStream()
.filter(o -> o.amount() > 100)
.mapToInt(Order::amount)
.sum(); // sum() la reduce associative, parallel-safe
ArrayList không thread-safe. Gọi list.add(element) từ nhiều thread đồng thời có thể gây: kết quả thiếu phần tử, ArrayIndexOutOfBoundsException khi resize, hoặc null xuất hiện ở giữa list. Không có exception rõ ràng — bug im lặng. Pattern đúng: dùng collect(Collectors.toList()) hoặc collect(Collectors.toConcurrentMap(...)) — collector đảm bảo thread-safety trong accumulate.
5. Pitfall 2 — forEach không guarantee order
Với sequential stream, forEach emit phần tử theo thứ tự source. Với parallel stream, thứ tự phụ thuộc thread scheduling — không deterministic.
List<Integer> nums = List.of(1, 2, 3, 4, 5);
// Sequential: in theo thu tu 1 2 3 4 5
nums.stream().forEach(System.out::println);
// Parallel: thu tu khong bao gio biet truoc
nums.parallelStream().forEach(System.out::println);
// Co the in: 3 1 4 2 5 hoac 2 4 1 5 3 hoac bat ky thu tu nao
Nếu cần guarantee thứ tự với parallel stream, dùng forEachOrdered:
nums.parallelStream().forEachOrdered(System.out::println);
// In dung thu tu: 1 2 3 4 5
Nhưng forEachOrdered phải serialize kết quả trước khi emit — nó hủy phần lớn lợi ích parallel. Parallel với forEachOrdered thường chậm hơn sequential vì phải sync và đợi thứ tự.
Rule thực tế: nếu cần thứ tự kết quả, dùng collect() thay vì forEachOrdered. collect(Collectors.toList()) với parallel stream gom theo thứ tự encounter order của stream source.
6. Pitfall 3 — ForkJoinPool.commonPool() shared và blocking I/O
Đây là pitfall nguy hiểm nhất ở production. ForkJoinPool.commonPool() là singleton — tất cả parallel stream trong toàn JVM dùng chung. Nếu một parallel stream thực hiện I/O blocking (HTTP call, JDBC query, file read), thread bị block và không thể xử lý task khác.
6.1 Cơ chế Starvation (Đói tài nguyên) toàn JVM
Vì commonPool được chia sẻ cấp độ JVM, khi một đoạn code chạy parallel stream bị nghẽn I/O (ví dụ: đợi API bên thứ ba mất 2 giây hoặc thực thi câu query database nặng), các worker thread của pool chung sẽ bị chiếm dụng hết để chờ đợi.
Hậu quả gián tiếp nghiêm trọng:
- Toàn bộ các luồng xử lý song song khác trong ứng dụng sẽ bị nghẽn hoàn toàn (Starvation).
- Các tác vụ bất đồng bộ như
@Asynccủa Spring Boot (nếu cấu hình default dùng TaskExecutor mặc định hoặc trỏ về common pool) sẽ bị trì hoãn không thể khởi chạy. - Các request của client đi vào các endpoint có sử dụng parallel stream hoặc
CompletableFuture(không truyền executor riêng) sẽ chịu độ trễ lớn hoặc bị timeout.
// BAD -- parallel stream goi HTTP cho tung URL
List<String> responses = urls.parallelStream()
.map(url -> httpGet(url)) // block 200-500ms moi call
.toList();
// Hau qua: Tat ca thread cua commonPool bi block vao HTTP
// Cac parallel stream khac trong app hoac cac tac vu bat dong bo trong JVM khong con thread de chay!
6.2 Kỹ thuật nâng cao: Cách ly tài nguyên bằng Custom ForkJoinPool
Để giải quyết triệt để vấn đề nghẽn JVM do dùng chung commonPool, Java cho phép một kỹ thuật nâng cao: Bọc parallel stream bên trong một instance ForkJoinPool riêng biệt và submit tác vụ qua pool đó.
Khi một parallel stream được kích hoạt bên trong một task đang chạy trên một ForkJoinPool cụ thể, cơ chế ForkJoin của Java đủ thông minh để tự động điều hướng tất cả các tác vụ con (ForkJoinTask) của stream đó sử dụng chính pool riêng này thay vì rơi về commonPool.
// Boc parallel stream trong ForkJoinPool tuy chinh de cach ly tai nguyen
ForkJoinPool customPool = null;
try {
// Khoi tao pool rieng biet voi 4 thread danh rieng cho tac vu I/O nay
customPool = new ForkJoinPool(4);
List<String> responses = customPool.submit(() ->
urls.parallelStream()
.map(url -> httpGet(url)) // Co blocking I/O nhung chi block thread cua customPool
.toList()
).get(); // Block thread hien tai cho den khi hoan thanh
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Task interrupted", e);
} catch (ExecutionException e) {
throw new RuntimeException("Execution failed in custom pool", e.getCause());
} finally {
if (customPool != null) {
customPool.shutdown(); // Giai phong tai nguyen khi dung xong
}
}
Cách bọc parallel stream trong customPool chỉ nên dùng khi bạn bắt buộc phải tái sử dụng một thư viện hoặc một đoạn code parallel stream có sẵn mà không thể refactor cấu trúc. Đối với các tác vụ I/O-bound viết mới, CompletableFuture kết hợp với một thread pool riêng biệt (ExecutorService) luôn là lựa chọn hàng đầu và rõ ràng hơn về mặt ngữ nghĩa.
Project Loom (Java 21) giới thiệu Virtual Threads — lightweight thread chạy được hàng nghìn đồng thời mà không block OS thread khi I/O. Tuy nhiên parallel stream chưa tích hợp Virtual Threads tính đến Java 21. parallelStream() vẫn dùng ForkJoinPool với platform threads. Để tận dụng Virtual Threads cho I/O concurrency, dùng Executors.newVirtualThreadPerTaskExecutor() làm executor cho CompletableFuture — không dùng parallel stream.
7. Pitfall 4 — source splittable kém và chi phí phân rã dữ liệu
Parallel stream chia nhỏ dữ liệu nguồn thông qua cơ chế Spliterator.trySplit(). Tốc độ và hiệu quả của việc xử lý song song phụ thuộc trực tiếp vào cấu trúc dữ liệu nguồn có dễ dàng phân chia hay không.
7.1 ArrayList: Phân rã hoàn hảo O(1)
ArrayList hoặc các mảng nguyên thủy (int[], double[]) có cấu trúc lưu trữ liên tục trong bộ nhớ và biết trước kích thước chính xác (SIZED).
- Chi phí phân rã:
O(1). Khi split, Spliterator chỉ cần tính toán chỉ số trung vị toán học đơn giản:mid = (low + high) / 2. Phép chia đôi diễn ra ngay lập tức mà không cần duyệt qua các phần tử. - Cache Locality: Cực kỳ tốt. Dữ liệu nằm sát nhau giúp CPU tối ưu hóa bộ đệm L1/L2/L3 khi các luồng truy cập đồng thời.
7.2 LinkedList: Chi phí phân rã tồi tệ O(N)
LinkedList là cấu trúc danh sách liên kết.
- Chi phí phân rã:
O(N). Để chia đôi danh sách, Spliterator không thể nhảy trực tiếp tới phần tử ở giữa. Nó buộc phải duyệt tuần tự từ phần tử đầu tiên (head) quaN / 2nút để tìm ra điểm chia. - Hậu quả: Thời gian phân rã dữ liệu có độ phức tạp thuật toán
O(N). Đối với một tập dữ liệu lớn, việc phân tách này tiêu tốn nhiều tài nguyên CPU hơn cả việc chạy tuần tự đơn luồng. - Cache Locality: Tệ hại. Các nút của
LinkedListnằm rải rác khắp nơi trên Heap, gây ra hiện tượng CPU Cache Miss liên tục khi nhiều thread truy cập đồng thời.
7.3 Files.lines: Phân rã tuần tự O(N)
Files.lines(path) đại diện cho một luồng đọc file theo dòng.
- Chi phí phân rã:
O(N). Hệ thống không thể biết trước một dòng dài bao nhiêu byte hoặc có tổng cộng bao nhiêu dòng trong file mà không đọc qua chúng. - Hậu quả: Spliterator buộc phải đọc tuần tự từng dòng để xác định ranh giới trước khi có thể cắt nhỏ và đưa sang thread khác. Việc phân rã song song trên luồng I/O này thực chất bị biến thành một chuỗi các thao tác tuần tự đắt đỏ.
7.4 Stream.iterate / Stream.generate: bản chất tuần tự
Stream.iterate(seed, fn) sinh phần tử tuần tự — phần tử thứ N phụ thuộc phần tử N-1, nên Spliterator không thể "nhảy" tới giữa dãy để chia đôi. Kết quả split rất lệch (một chunk lớn, các chunk nhỏ), thread làm việc không đều. Nếu cần parallel trên dãy số, dùng IntStream.range/LongStream.range — biết trước kích thước, split O(1) như array.
// NOT OPTIMAL -- LinkedList split kem O(N)
List<Order> orders = new LinkedList<>(rawOrders);
long count = orders.parallelStream()
.filter(o -> o.amount() > 100)
.count();
// BETTER -- ArrayList split O(1)
List<Order> orders = new ArrayList<>(rawOrders);
long count = orders.parallelStream()
.filter(o -> o.amount() > 100)
.count();
8. Khi nào parallel thực sự có lợi — checklist
Parallel stream có lợi chỉ khi hội đủ tất cả điều kiện sau:
| Điều kiện | Vì sao cần |
|---|---|
| N×Q vượt 10000 | Overhead fork/join < thời gian tính toán thực |
| CPU-bound, không I/O | I/O-bound block thread pool, hại toàn JVM |
| Không shared mutable state | Race condition, lock contention |
| Operation associative | Reduce/combine cho kết quả đúng bất kể thứ tự |
| Source splittable hiệu quả (array-backed, range) | Chia đều cho các thread, không lệch — tránh LinkedList, Files.lines, Stream.iterate |
| commonPool không bận | Không tranh tài nguyên với task khác trong JVM |
Thực tế enterprise, workload thoả hết 6 điều này khá hiếm. Đa số business logic liên quan DB, HTTP, hoặc state shared. Trong những trường hợp đó, sequential stream hoặc CompletableFuture với executor riêng là lựa chọn tốt hơn.
9. Ví dụ: khi parallel thực sự thắng
Bài toán tính số nguyên tố trong 10 triệu số — CPU-bound, stateless, array-backed source:
// Sequential
long seqCount = LongStream.range(2, 10_000_000)
.filter(PrimeChecker::isPrime)
.count();
// Parallel -- hop le: N=10M, Q cao (kiem tra so nguyen to),
// source LongStream.range split O(1), khong shared state, CPU-bound
long parCount = LongStream.range(2, 10_000_000)
.parallel()
.filter(PrimeChecker::isPrime)
.count();
Trên máy 8 core, parallel cho speedup xấp xỉ 5-7 lần (không đến 8 lần vì overhead và main thread cũng tính).
// Benchmark mental model:
// isPrime(n): kiem tra chia het cho 2 den sqrt(n) -- O(sqrt(n)) phep tinh
// Voi n ~ 10^7, sqrt(n) ~ 3162 phep tinh -> Q ~ 3000
// N = 10.000.000, Q ~ 3000 -> N*Q ~ 3*10^10 >> 10000 -> parallel rat co loi
10. Tư duy thiết kế hệ thống concurrent: Parallel Stream vs CompletableFuture
Để giúp bạn đưa ra các quyết định thiết kế kiến trúc chuẩn xác khi lập trình đa luồng trong các hệ thống lớn, hãy tham khảo ma trận so sánh trực quan dưới đây:
| Tiêu chí So sánh | Parallel Stream (parallelStream()) | CompletableFuture + Custom Executor |
|---|---|---|
| Bản chất Kiến trúc | Xử lý dữ liệu song song dạng pipeline (Data-Parallelism). Chia nhỏ một tập dữ liệu lớn để tính toán. | Lập trình bất đồng bộ hướng tác vụ (Task-Based Asynchrony). Phối hợp và xâu chuỗi nhiều luồng xử lý độc lập. |
| Phân bổ Thread Pool | Mặc định sử dụng ForkJoinPool.commonPool() - là singleton dùng chung toàn bộ JVM. | Sử dụng các pool độc lập và tường minh (ThreadPoolExecutor, VirtualThreadPerTaskExecutor). |
| Loại Workload Phù hợp | CPU-bound (Tính toán số học, xử lý ảnh, hash code, parsing lượng lớn dữ liệu offline). | I/O-bound (Gọi Web API, truy vấn Database, đọc/ghi file từ đĩa, giao tiếp Network). |
| Tính Cách ly Tài nguyên | Cực kém. Một tác vụ lỗi/block có thể làm nghẽn toàn bộ ứng dụng Java (kể cả Spring Boot @Async). | Cực tốt. Mỗi nhóm chức năng có thể chạy trên một Executor riêng biệt, cô lập lỗi hoàn toàn. |
| Khả năng Điều phối | Tuyến tính, đơn giản. Chỉ hỗ trợ chia nhỏ dữ liệu rồi gộp lại thông qua combiners. | Cực mạnh. Hỗ trợ xâu chuỗi phức tạp (thenCompose), gom kết quả (allOf, anyOf), fallback lồng nhau. |
| Xử lý Timeout | Không hỗ trợ trực tiếp trên từng phần tử. | Hỗ trợ cấu hình timeout linh hoạt (orTimeout, completeOnTimeout). |
| Độ phức tạp Mã nguồn | Cực kỳ đơn giản. Chỉ cần thêm .parallel() vào stream pipeline sẵn có. | Trung bình - Cao. Đòi hỏi thiết kế vòng đời thread pool, handle exception bất đồng bộ. |
Quy tắc chọn:
- CPU-bound, stateless, array-backed, không cần composition →
parallelStream(). - I/O-bound, cần timeout, cần error handling per-task, cần Virtual Threads →
CompletableFuture+ executor.
11. Deep Dive
- "When to Use Parallel Streams" — Doug Lea — web.archive.org/web/20160331125154/http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html — bài gốc của tác giả
ForkJoinPool, giải thích công thức N×Q, phân tích chi tiết khi nào parallel thắng. Bắt buộc đọc trước khi dùng parallel stream ở production. ForkJoinPoolJavadoc SE 21 — docs.oracle.com/.../ForkJoinPool.html — giải thích work-stealing,commonPool(), cấu hìnhparallelism, và hành vi khi task blocking.SpliteratorJavadoc SE 21 — docs.oracle.com/.../Spliterator.html — cơ chế chia data cho parallel, cácCharacteristics(SIZED, ORDERED, SUBSIZED...) ảnh hưởng đến hiệu quả split.- JEP 446 — Scoped Values (Java 21) — openjdk.org/jeps/446 — cơ chế chia sẻ dữ liệu read-only an toàn giữa thread (thay thế
ThreadLocaltrong concurrent context), liên quan khi parallel stream cần chia config/context qua thread. - Project Loom — Virtual Threads (JEP 444, Java 21) — openjdk.org/jeps/444 — giải thích vì sao parallel stream chưa tích hợp Virtual Threads, và hướng đúng cho I/O concurrency trong Java 21+.
12. Self-check
Q1Dev thêm `.parallel()` vào stream xử lý 500 record, mỗi record gọi 1 hàm tính toán đơn giản (tổng vài phép nhân). Performance tệ hơn 40%. Giải thích nguyên nhân và quyết định dùng sequential hay parallel.▸
Q2Đoạn code sau có bug không? Nếu có, bug gì, và sửa thế nào? List<String> results = new ArrayList<>(); orders.parallelStream().map(Order::id).forEach(results::add);▸
List<String> results = new ArrayList<>(); orders.parallelStream().map(Order::id).forEach(results::add);List<String> results = orders.parallelStream().map(Order::id).collect(Collectors.toList()); Collector đảm bảo mỗi thread có container riêng, combiner gộp thread-safe cuối cùng.Q3`ForkJoinPool.commonPool()` có bao nhiêu thread? Vì sao đây là nguy hiểm với I/O-bound parallel stream?▸
Q4`parallelStream().forEachOrdered(consumer)` có đảm bảo thứ tự không? Tại sao performance tệ hơn sequential?▸
Q5Vì sao `LinkedList` là nguồn kém cho parallel stream, còn `ArrayList` lại tốt?▸
(left + right) / 2. `LinkedList` là doubly-linked list — để tìm phần tử ở giữa phải traverse từ head O(n/2). Với N=1 triệu element, split `LinkedList` tốn 500000 bước — bản thân split đã đắt hơn nhiều công việc thực. Thêm nữa, split không đều vì overhead — một số chunk có nhiều element hơn, thread không cân bằng. Hậu quả: parallel stream trên `LinkedList` thường chậm hơn sequential. Nếu đang dùng `LinkedList`, convert sang `ArrayList` trước khi parallel: new ArrayList<>(linkedList).parallelStream().Q6Cho đoạn code: orders.parallelStream().filter(o -> o.amount() > 100).sorted().toList() — `sorted()` trong pipeline parallel ảnh hưởng performance thế nào?▸
orders.parallelStream().filter(o -> o.amount() > 100).sorted().toList() — `sorted()` trong pipeline parallel ảnh hưởng performance thế nào?Bài tiếp theo: Immutability và Functional Style — thiết kế Java không có surprise
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên