Java OO & Functional/Parallel Stream — khi nào thực sự nhanh hơn, khi nào nguy hiểm
31/33
Bài 31 / 33~20 phútStream API & LambdaMiễn phí lượt xem

Parallel Stream — khi nào thực sự nhanh hơn, khi nào nguy hiểm

ForkJoinPool bên dưới parallelStream, 4 pitfall stateful/ordering/blocking/race condition, formula Doug Lea NxQ, và khi nào nên dùng CompletableFuture thay thế.

TL;DR: parallelStream() dùng ForkJoinPool.commonPool() — pool shared toàn JVM — để chia dữ liệu thành chunk và xử lý song song. Nhưng "thêm .parallel() là nhanh gấp N lần CPU" là ngộ nhận. 4 điều kiện phải hội đủ: dataset lớn (công thức Doug Lea N×Q > 10000), CPU-bound không phải I/O-bound, không shared state, source splittable hiệu quả. Vi phạm bất kỳ điều kiện nào đều cho kết quả tệ hơn sequential hoặc gây race condition im lặng. Bài này phân tích từng pitfall đến tận cơ chế ForkJoin, và khi nào dùng CompletableFuture với executor riêng thay vì parallel stream.

1. Scenario — thêm .parallel() mà chậm hơn

Một developer thấy đoạn xử lý 1000 record mất 200ms. Đọc được "parallel stream dùng nhiều thread", anh ấy thêm vào:

// TRUOC: 200ms
List<Report> reports = orders.stream()
    .map(this::generateReport)
    .toList();

// SAU khi them parallel(): mong doi nhanh hon, thuc te 350ms
List<Report> reports = orders.parallelStream()
    .map(this::generateReport)
    .toList();

Performance tệ hơn 75%. Trước khi hiểu vì sao, cần biết bên dưới parallelStream() làm gì.

2. parallelStream() và ForkJoin — cơ chế bên dưới

parallelStream() (và stream().parallel()) chuyển stream thành parallel stream — Java chia dữ liệu nguồn thành nhiều chunk qua cơ chế Spliterator, giao mỗi chunk cho một worker thread trong ForkJoinPool.commonPool(), rồi gộp kết quả qua combiner.

ForkJoinPool (java.util.concurrent, Java 7+) là thread pool đặc biệt dùng thuật toán work-stealing — thread nào rảnh có thể "ăn cắp" task từ hàng chờ của thread khác. ForkJoinPool.commonPool()singleton shared toàn JVM: mặc định có Runtime.getRuntime().availableProcessors() - 1 thread (trừ 1 vì main thread cũng tính).

// Kiem tra kich thuoc commonPool
System.out.println(ForkJoinPool.commonPool().getParallelism());
// 7 tren may 8 core (8 - 1 = 7)

Luồng xử lý parallel stream:

flowchart TD
    A[parallelStream] --> B[Spliterator chia data\nthành N chunk]
    B --> C1[Chunk 1\nThread-1]
    B --> C2[Chunk 2\nThread-2]
    B --> C3[Chunk 3\nThread-3]
    B --> C4[Chunk N\nThread-N]
    C1 --> D[Combiner gộp kết quả]
    C2 --> D
    C3 --> D
    C4 --> D
    D --> E[Result]
    F[ForkJoinPool.commonPool\nSingleton JVM-wide] -.-> C1
    F -.-> C2
    F -.-> C3
    F -.-> C4

Chi phí overhead của mỗi parallel pipeline:

  • Chia data thành chunk (Spliterator).
  • Fork task vào pool queue.
  • Context switch giữa các thread.
  • Work-stealing coordination.
  • Combine kết quả từ các thread.

Tổng overhead này khoảng 100-200 microsecond theo benchmark Aleksey Shipilev (JIT engineer Oracle). Với dataset nhỏ hoặc operation nhẹ, overhead lớn hơn "công sức" thực → parallel chậm hơn sequential.

3. Công thức Doug Lea — N×Q > 10000

Doug Lea (tác giả ForkJoinPool, Professor Emeritus SUNY Oswego) đề xuất quy tắc thực nghiệm:

Parallel stream đáng khi N × Q > 10000

Trong đó:

  • N — số phần tử trong stream.
  • Q — cost xử lý mỗi phần tử, đơn vị "đơn vị tính toán" (operation count, không phải nanosecond). Phép cộng, so sánh, array lookup ~ Q=1. Regex match ~ Q=100. Hash computation ~ Q=50. Network call ~ Q cao nhưng I/O-bound nên không nên parallel.

Ví dụ:

  • N=1000, Q=5 → N×Q=5000 → dưới ngưỡng → sequential tốt hơn.
  • N=100000, Q=10 → N×Q=1.000.000 → parallel có lợi rõ.
  • N=1000, Q=20 → N×Q=20000 → ranh giới, đo thực tế bằng JMH.

Quay lại scenario: generateReport với 1000 record — nếu generateReport là lightweight operation (Q nhỏ), N×Q dưới ngưỡng → overhead parallel thắng → 350ms tệ hơn 200ms.

Rule of thumb thực tế

Trong code production, ngưỡng 10000 là "số liệu tham khảo, không phải guarantee". Data structure, cache behavior, GC pressure đều ảnh hưởng. Luôn đo bằng JMH (Java Microbenchmark Harness) với data production-like trước khi quyết định. Trực giác sai thường xuyên.

4. Pitfall 1 — stateful lambda và race condition

Stateful lambda là lambda đọc hoặc ghi vào biến ngoài scope của nó. Trong parallel stream, nhiều thread chạy lambda đồng thời — nếu biến ngoài không thread-safe, race condition xảy ra.

// SAI -- race condition tren shared counter
int[] count = {0};  // effectively final array trick

orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .forEach(o -> {
        count[0]++;  // RACE CONDITION -- nhieu thread ghi dong thoi
    });

System.out.println(count[0]);  // ket qua khong deterministic moi lan chay

Vì sao nguy hiểm: count[0]++read-modify-write — 3 bước không atomic. Thread A đọc count=5, Thread B đọc count=5, Thread A ghi 6, Thread B ghi 6 (lẽ ra phải 7). Kết quả nhỏ hơn thực tế và thay đổi mỗi lần chạy.

Workaround 1 — AtomicInteger cho counter đơn giản:

AtomicInteger count = new AtomicInteger(0);

orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .forEach(o -> count.incrementAndGet());  // atomic, thread-safe

System.out.println(count.get());  // chinh xac

Workaround 2 — Redesign thành reduce (idiom tốt hơn):

// Dung count() terminal op -- khong can shared state gi ca
long count = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .count();

Hoặc với custom aggregation:

// Dung reduce de tinh tong -- pure function, khong shared state
int totalAmount = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .mapToInt(Order::amount)
    .sum();  // sum() la reduce associative, parallel-safe
ArrayList.add() trong forEach parallel — không phải chỉ race condition

ArrayList không thread-safe. Gọi list.add(element) từ nhiều thread đồng thời có thể gây: kết quả thiếu phần tử, ArrayIndexOutOfBoundsException khi resize, hoặc null xuất hiện ở giữa list. Không có exception rõ ràng — bug im lặng. Pattern đúng: dùng collect(Collectors.toList()) hoặc collect(Collectors.toConcurrentMap(...)) — collector đảm bảo thread-safety trong accumulate.

5. Pitfall 2 — forEach không guarantee order

Với sequential stream, forEach emit phần tử theo thứ tự source. Với parallel stream, thứ tự phụ thuộc thread scheduling — không deterministic.

List<Integer> nums = List.of(1, 2, 3, 4, 5);

// Sequential: in theo thu tu 1 2 3 4 5
nums.stream().forEach(System.out::println);

// Parallel: thu tu khong bao gio biet truoc
nums.parallelStream().forEach(System.out::println);
// Co the in: 3 1 4 2 5 hoac 2 4 1 5 3 hoac bat ky thu tu nao

Nếu cần guarantee thứ tự với parallel stream, dùng forEachOrdered:

nums.parallelStream().forEachOrdered(System.out::println);
// In dung thu tu: 1 2 3 4 5

Nhưng forEachOrdered phải serialize kết quả trước khi emit — nó hủy phần lớn lợi ích parallel. Parallel với forEachOrdered thường chậm hơn sequential vì phải sync và đợi thứ tự.

Rule thực tế: nếu cần thứ tự kết quả, dùng collect() thay vì forEachOrdered. collect(Collectors.toList()) với parallel stream gom theo thứ tự encounter order của stream source.

6. Pitfall 3 — ForkJoinPool.commonPool() shared và blocking I/O

Đây là pitfall nguy hiểm nhất ở production. ForkJoinPool.commonPool() là singleton — tất cả parallel stream trong toàn JVM dùng chung. Nếu một parallel stream thực hiện I/O blocking (HTTP call, JDBC query, file read), thread bị block và không thể xử lý task khác.

// BAD -- parallel stream goi HTTP cho tung URL
List<String> responses = urls.parallelStream()
    .map(url -> httpGet(url))   // block 200-500ms moi call
    .toList();

// Hau qua: 7 thread cua commonPool bi block vao HTTP
// Cac parallel stream khac trong app (va CompletableFuture.supplyAsync khong co executor)
// khong con thread de chay -- toan bo JVM bi cham

Scenario tệ hơn: ứng dụng có 2 endpoint. Endpoint A xử lý 50 URL song song dùng parallelStream(). Endpoint B dùng CompletableFuture.supplyAsync() (không truyền executor, default dùng commonPool). Khi A đang chạy, B đợi hoàn toàn vì pool hết thread.

Giải pháp: dùng ForkJoinPool riêng hoặc CompletableFuture với executor tường minh:

// Option 1: ForkJoinPool tuy chinh
ForkJoinPool customPool = new ForkJoinPool(4);  // pool rieng 4 thread
try {
    List<String> responses = customPool.submit(() ->
        urls.parallelStream()
            .map(url -> httpGet(url))
            .toList()
    ).get();
} finally {
    customPool.shutdown();
}

// Option 2 (idiomatic hon): CompletableFuture voi executor rieng
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<String>> futures = urls.stream()
    .map(url -> CompletableFuture.supplyAsync(() -> httpGet(url), executor))
    .toList();

List<String> responses = futures.stream()
    .map(CompletableFuture::join)
    .toList();

executor.shutdown();

Option 2 dùng CompletableFuture với executor riêng là pattern idiomatic cho I/O concurrency trong Java — không chiếm commonPool.

Virtual Threads và parallel stream (Java 21+)

Project Loom (Java 21) giới thiệu Virtual Threads — lightweight thread chạy được hàng nghìn đồng thời mà không block OS thread khi I/O. Tuy nhiên parallel stream chưa tích hợp Virtual Threads tính đến Java 21. parallelStream() vẫn dùng ForkJoinPool với platform threads. Để tận dụng Virtual Threads cho I/O concurrency, dùng Executors.newVirtualThreadPerTaskExecutor() làm executor cho CompletableFuture — không dùng parallel stream.

7. Pitfall 4 — source splittable kém

Parallel stream chia data qua Spliterator.trySplit(). Hiệu quả chia phụ thuộc vào data source:

SourceSplittableLý do
ArrayList, int[], double[]Tốt — O(1)Array: biết size, split ở index giữa tức thì
IntStream.range(0, N)Tốt — O(1)Biết range, split bằng toán học
HashMap.values()TốtBucket-based split
LinkedListKém — O(n)Phải traverse từ head để tìm midpoint
Files.lines(path)KémKhông biết tổng line count trước
Stream.iterate(seed, fn)Kém — không split đượcGenerator sequential, không có midpoint
Stream.generate(supplier)KémTương tự iterate

Khi source split kém, parallel stream chia không đều — một thread làm phần lớn công việc, các thread khác xong sớm rồi ngồi chờ. Overhead điều phối lớn hơn lợi ích song song.

// KHONG TOI UU -- LinkedList split kem
List<Order> orders = new LinkedList<>(rawOrders);
long count = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .count();

// TOT HON -- ArrayList split hieu qua
List<Order> orders = new ArrayList<>(rawOrders);
long count = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .count();

8. Khi nào parallel thực sự có lợi — checklist

Parallel stream có lợi chỉ khi hội đủ tất cả điều kiện sau:

Điều kiệnVì sao cần
N×Q vượt 10000Overhead fork/join < thời gian tính toán thực
CPU-bound, không I/OI/O-bound block thread pool, hại toàn JVM
Không shared mutable stateRace condition, lock contention
Operation associativeReduce/combine cho kết quả đúng bất kể thứ tự
Source splittable hiệu quảChia đều cho các thread, không lệch
commonPool không bậnKhông tranh tài nguyên với task khác trong JVM

Thực tế enterprise, workload thoả hết 6 điều này khá hiếm. Đa số business logic liên quan DB, HTTP, hoặc state shared. Trong những trường hợp đó, sequential stream hoặc CompletableFuture với executor riêng là lựa chọn tốt hơn.

9. Ví dụ: khi parallel thực sự thắng

Bài toán tính số nguyên tố trong 10 triệu số — CPU-bound, stateless, array-backed source:

// Sequential
long seqCount = LongStream.range(2, 10_000_000)
    .filter(PrimeChecker::isPrime)
    .count();

// Parallel -- hop le: N=10M, Q cao (kiem tra so nguyen to),
// source LongStream.range split O(1), khong shared state, CPU-bound
long parCount = LongStream.range(2, 10_000_000)
    .parallel()
    .filter(PrimeChecker::isPrime)
    .count();

Trên máy 8 core, parallel cho speedup xấp xỉ 5-7 lần (không đến 8 lần vì overhead và main thread cũng tính).

// Benchmark mental model:
// isPrime(n): kiem tra chia het cho 2 den sqrt(n) -- O(sqrt(n)) phep tinh
// Voi n ~ 10^7, sqrt(n) ~ 3162 phep tinh -> Q ~ 3000
// N = 10.000.000, Q ~ 3000 -> N*Q ~ 3*10^10 >> 10000 -> parallel rat co loi

10. So sánh parallel stream vs CompletableFuture

Tiêu chíparallelStream()CompletableFuture + executor
Thread poolcommonPool (shared, fixed size)Tự chọn executor (riêng biệt)
I/O-boundNguy hiểm — block commonPoolAn toàn — pool riêng không ảnh hưởng JVM
CPU-boundTốt khi N×Q > 10000Overhead cao hơn cho CPU-bound đơn giản
CompositionKhông — chỉ pipeline tuyến tínhCó — thenCompose, allOf, anyOf
Timeout per taskKhông cóCó — orTimeout, completeOnTimeout
Error handlingException propagate ra terminal opexceptionally, handle per-future
API complexityĐơn giản — thêm .parallel()Phức tạp hơn — cần explicit future chain
Java virtual threadsKhông tích hợp (Java 21)Có — dùng newVirtualThreadPerTaskExecutor

Quy tắc chọn:

  • CPU-bound, stateless, array-backed, không cần composition → parallelStream().
  • I/O-bound, cần timeout, cần error handling per-task, cần Virtual Threads → CompletableFuture + executor.

11. Deep Dive

  • "When to Use Parallel Streams" — Doug Leaweb.archive.org/web/20160331125154/http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html — bài gốc của tác giả ForkJoinPool, giải thích công thức N×Q, phân tích chi tiết khi nào parallel thắng. Bắt buộc đọc trước khi dùng parallel stream ở production.
  • ForkJoinPool Javadoc SE 21docs.oracle.com/.../ForkJoinPool.html — giải thích work-stealing, commonPool(), cấu hình parallelism, và hành vi khi task blocking.
  • Spliterator Javadoc SE 21docs.oracle.com/.../Spliterator.html — cơ chế chia data cho parallel, các Characteristics (SIZED, ORDERED, SUBSIZED...) ảnh hưởng đến hiệu quả split.
  • JEP 446 — Scoped Values (Java 21)openjdk.org/jeps/446 — cơ chế chia sẻ dữ liệu read-only an toàn giữa thread (thay thế ThreadLocal trong concurrent context), liên quan khi parallel stream cần chia config/context qua thread.
  • Project Loom — Virtual Threads (JEP 444, Java 21)openjdk.org/jeps/444 — giải thích vì sao parallel stream chưa tích hợp Virtual Threads, và hướng đúng cho I/O concurrency trong Java 21+.

12. Self-check

Tự kiểm tra
Q1
Dev thêm `.parallel()` vào stream xử lý 500 record, mỗi record gọi 1 hàm tính toán đơn giản (tổng vài phép nhân). Performance tệ hơn 40%. Giải thích nguyên nhân và quyết định dùng sequential hay parallel.
Nguyên nhân: overhead fork/join lớn hơn "công sức" thực tế. Với N=500 record và Q thấp (vài phép nhân, Q ~ 5-10), N×Q ~ 2500-5000 — dưới ngưỡng 10000 của Doug Lea. Khi overhead split data, fork task vào ForkJoinPool, context switch, và combine kết quả tốn 100-200 microsecond, mà tổng công việc thực chỉ vài microsecond → parallel tốn thêm nhiều hơn tiết kiệm. Quyết định: dùng sequential stream. Chỉ cân nhắc parallel khi tăng N (vượt 100000) hoặc khi phép tính mỗi record nặng hơn (parsing phức tạp, hash computation, regex). Đo bằng JMH với data thực tế trước khi quyết định cuối cùng.
Q2
Đoạn code sau có bug không? Nếu có, bug gì, và sửa thế nào? List<String> results = new ArrayList<>(); orders.parallelStream().map(Order::id).forEach(results::add);
Có bug nghiêm trọng. `ArrayList` không thread-safe. `forEach` với parallel stream chạy `results.add()` từ nhiều thread đồng thời. `ArrayList` dùng `elementData` array — khi nhiều thread gọi `add` cùng lúc, có thể: (1) ghi đè nhau cùng index, (2) bị `ArrayIndexOutOfBoundsException` khi một thread đang resize mà thread khác ghi, (3) kết quả list có phần tử `null` ở giữa. Không có exception rõ ràng — bug im lặng với kết quả sai. Sửa đúng: List<String> results = orders.parallelStream().map(Order::id).collect(Collectors.toList()); Collector đảm bảo mỗi thread có container riêng, combiner gộp thread-safe cuối cùng.
Q3
`ForkJoinPool.commonPool()` có bao nhiêu thread? Vì sao đây là nguy hiểm với I/O-bound parallel stream?
Mặc định `commonPool()` có `Runtime.getRuntime().availableProcessors() - 1` thread — máy 8 core có 7 thread. Nguy hiểm với I/O-bound vì: commonPool là singleton shared toàn JVM. Khi parallel stream thực hiện HTTP call blocking (200-500ms mỗi call), các thread trong pool bị block và không thể xử lý task khác. Nếu 7 thread đều block vào HTTP, mọi parallel stream khác, mọi `CompletableFuture.supplyAsync()` không có executor riêng đều phải chờ. Toàn bộ JVM bị throttle chỉ vì 1 parallel stream I/O-bound. Giải pháp: I/O concurrency phải dùng executor riêng — `CompletableFuture.supplyAsync(task, customExecutor)` hoặc Virtual Thread executor trên Java 21+.
Q4
`parallelStream().forEachOrdered(consumer)` có đảm bảo thứ tự không? Tại sao performance tệ hơn sequential?
`forEachOrdered` có đảm bảo thứ tự encounter — phần tử được emit theo thứ tự nguồn, không phải thứ tự thread hoàn thành. Nhưng để làm được điều này với parallel stream, JVM phải: (1) xử lý song song để có kết quả từ nhiều chunk, (2) buffer kết quả đã xong nhưng chưa đến lượt, (3) serialize emit theo thứ tự. Bước serialize này tạo ra bottleneck — các thread xong sớm phải chờ thread phía trước emit xong mới được tiếp tục. Overhead coordination cao hơn nhiều so với sequential. Kết quả: thường chậm hơn sequential 20-50%. Nếu cần thứ tự và parallel, dùng `collect(Collectors.toList())` thay — collector gom song song nhưng giữ thứ tự trong list result mà không serialize từng emit.
Q5
Vì sao `LinkedList` là nguồn kém cho parallel stream, còn `ArrayList` lại tốt?
`Spliterator.trySplit()` cần tìm midpoint để chia đôi data cho 2 thread. `ArrayList` (và array) biết size và có random access O(1) — tìm midpoint tức thì bằng (left + right) / 2. `LinkedList` là doubly-linked list — để tìm phần tử ở giữa phải traverse từ head O(n/2). Với N=1 triệu element, split `LinkedList` tốn 500000 bước — bản thân split đã đắt hơn nhiều công việc thực. Thêm nữa, split không đều vì overhead — một số chunk có nhiều element hơn, thread không cân bằng. Hậu quả: parallel stream trên `LinkedList` thường chậm hơn sequential. Nếu đang dùng `LinkedList`, convert sang `ArrayList` trước khi parallel: new ArrayList<>(linkedList).parallelStream().
Q6
Cho đoạn code: orders.parallelStream().filter(o -> o.amount() > 100).sorted().toList() — `sorted()` trong pipeline parallel ảnh hưởng performance thế nào?
`sorted()` là stateful intermediate operation — phải buffer toàn bộ phần tử đã filter rồi mới sort. Trong parallel stream, các thread xử lý `filter` song song (tốt), nhưng sau đó toàn bộ kết quả phải được gom về 1 chỗ để sort (sort là single-threaded by default trong Java Arrays.sort, dù có `Arrays.parallelSort` riêng). Hậu quả: pipeline dừng parallel ở `sorted`, tất cả thread chờ sort xong, rồi mới tiếp tục. Nếu mục tiêu là sort kết quả cuối, thường tốt hơn khi: (1) filter song song, (2) collect vào list, (3) sort list sequential bằng `list.sort(comparator)`. Hoặc chấp nhận `sorted()` trong pipeline nhưng biết đây là bottleneck. `Arrays.parallelSort` (Java 8+) có thể sort song song nhưng không tích hợp trực tiếp vào Stream API.

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên