Java OO & Functional/Parallel Stream — khi nào thực sự nhanh hơn, khi nào nguy hiểm
34/36
Bài 34 / 36~20 phútStream API & LambdaMiễn phí lượt xem

Parallel Stream — khi nào thực sự nhanh hơn, khi nào nguy hiểm

ForkJoinPool bên dưới parallelStream, 4 pitfall stateful/ordering/blocking/race condition, formula Doug Lea NxQ, và khi nào nên dùng CompletableFuture thay thế.

TL;DR: parallelStream() dùng ForkJoinPool.commonPool() — pool shared toàn JVM — để chia dữ liệu thành chunk và xử lý song song. Nhưng "thêm .parallel() là nhanh gấp N lần CPU" là ngộ nhận. 4 điều kiện phải hội đủ: dataset lớn (công thức Doug Lea N×Q > 10000), CPU-bound không phải I/O-bound, không shared state, source splittable hiệu quả. Vi phạm bất kỳ điều kiện nào đều cho kết quả tệ hơn sequential hoặc gây race condition im lặng. Bài này phân tích từng pitfall đến tận cơ chế ForkJoin, và khi nào dùng CompletableFuture với executor riêng thay vì parallel stream.

1. Scenario — thêm .parallel() mà chậm hơn

Một developer thấy đoạn xử lý 1000 record mất 200ms. Đọc được "parallel stream dùng nhiều thread", anh ấy thêm vào:

// TRUOC: 200ms
List<Report> reports = orders.stream()
    .map(this::generateReport)
    .toList();

// SAU khi them parallel(): mong doi nhanh hon, thuc te 350ms
List<Report> reports = orders.parallelStream()
    .map(this::generateReport)
    .toList();

Performance tệ hơn 75%. Trước khi hiểu vì sao, cần biết bên dưới parallelStream() làm gì.

2. parallelStream() và ForkJoin — cơ chế bên dưới

parallelStream() (và stream().parallel()) chuyển stream thành parallel stream — Java chia dữ liệu nguồn thành nhiều chunk qua cơ chế Spliterator, giao mỗi chunk cho một worker thread trong ForkJoinPool.commonPool(), rồi gộp kết quả qua combiner.

ForkJoinPool (java.util.concurrent, Java 7+) là thread pool đặc biệt dùng thuật toán work-stealing — thread nào rảnh có thể "ăn cắp" task từ hàng chờ của thread khác. ForkJoinPool.commonPool()singleton shared toàn JVM: mặc định có Runtime.getRuntime().availableProcessors() - 1 thread (trừ 1 vì main thread cũng tính).

// Kiem tra kich thuoc commonPool
System.out.println(ForkJoinPool.commonPool().getParallelism());
// 7 tren may 8 core (8 - 1 = 7)

Luồng xử lý parallel stream:

flowchart TD
    A[parallelStream] --> B[Spliterator chia data\nthành N chunk]
    B --> C1[Chunk 1\nThread-1]
    B --> C2[Chunk 2\nThread-2]
    B --> C3[Chunk 3\nThread-3]
    B --> C4[Chunk N\nThread-N]
    C1 --> D[Combiner gộp kết quả]
    C2 --> D
    C3 --> D
    C4 --> D
    D --> E[Result]
    F[ForkJoinPool.commonPool\nSingleton JVM-wide] -.-> C1
    F -.-> C2
    F -.-> C3
    F -.-> C4

Chi phí overhead của mỗi parallel pipeline:

  • Chia data thành chunk (Spliterator).
  • Fork task vào pool queue.
  • Context switch giữa các thread.
  • Work-stealing coordination.
  • Combine kết quả từ các thread.

Tổng overhead này khoảng 100-200 microsecond theo benchmark Aleksey Shipilev (JIT engineer Oracle). Với dataset nhỏ hoặc operation nhẹ, overhead lớn hơn "công sức" thực → parallel chậm hơn sequential.

3. Công thức Doug Lea — N×Q > 10000

Doug Lea (tác giả ForkJoinPool, Professor Emeritus SUNY Oswego) đề xuất quy tắc thực nghiệm:

Parallel stream đáng khi N × Q > 10000

Trong đó:

  • N — số phần tử trong stream.
  • Q — cost xử lý mỗi phần tử, đơn vị "đơn vị tính toán" (operation count, không phải nanosecond). Phép cộng, so sánh, array lookup ~ Q=1. Regex match ~ Q=100. Hash computation ~ Q=50. Network call ~ Q cao nhưng I/O-bound nên không nên parallel.

Ví dụ:

  • N=1000, Q=5 → N×Q=5000 → dưới ngưỡng → sequential tốt hơn.
  • N=100000, Q=10 → N×Q=1.000.000 → parallel có lợi rõ.
  • N=1000, Q=20 → N×Q=20000 → ranh giới, đo thực tế bằng JMH.

Quay lại scenario: generateReport với 1000 record — nếu generateReport là lightweight operation (Q nhỏ), N×Q dưới ngưỡng → overhead parallel thắng → 350ms tệ hơn 200ms.

Rule of thumb thực tế

Trong code production, ngưỡng 10000 là "số liệu tham khảo, không phải guarantee". Data structure, cache behavior, GC pressure đều ảnh hưởng. Luôn đo bằng JMH (Java Microbenchmark Harness) với data production-like trước khi quyết định. Trực giác sai thường xuyên.

4. Pitfall 1 — stateful lambda và race condition

Stateful lambda là lambda đọc hoặc ghi vào biến ngoài scope của nó. Trong parallel stream, nhiều thread chạy lambda đồng thời — nếu biến ngoài không thread-safe, race condition xảy ra.

// SAI -- race condition tren shared counter
int[] count = {0};  // effectively final array trick

orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .forEach(o -> {
        count[0]++;  // RACE CONDITION -- nhieu thread ghi dong thoi
    });

System.out.println(count[0]);  // ket qua khong deterministic moi lan chay

Vì sao nguy hiểm: count[0]++read-modify-write — 3 bước không atomic. Thread A đọc count=5, Thread B đọc count=5, Thread A ghi 6, Thread B ghi 6 (lẽ ra phải 7). Kết quả nhỏ hơn thực tế và thay đổi mỗi lần chạy.

Workaround 1 — AtomicInteger cho counter đơn giản:

AtomicInteger count = new AtomicInteger(0);

orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .forEach(o -> count.incrementAndGet());  // atomic, thread-safe

System.out.println(count.get());  // chinh xac

Workaround 2 — Redesign thành reduce (idiom tốt hơn):

// Dung count() terminal op -- khong can shared state gi ca
long count = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .count();

Hoặc với custom aggregation:

// Dung reduce de tinh tong -- pure function, khong shared state
int totalAmount = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .mapToInt(Order::amount)
    .sum();  // sum() la reduce associative, parallel-safe
ArrayList.add() trong forEach parallel — không phải chỉ race condition

ArrayList không thread-safe. Gọi list.add(element) từ nhiều thread đồng thời có thể gây: kết quả thiếu phần tử, ArrayIndexOutOfBoundsException khi resize, hoặc null xuất hiện ở giữa list. Không có exception rõ ràng — bug im lặng. Pattern đúng: dùng collect(Collectors.toList()) hoặc collect(Collectors.toConcurrentMap(...)) — collector đảm bảo thread-safety trong accumulate.

5. Pitfall 2 — forEach không guarantee order

Với sequential stream, forEach emit phần tử theo thứ tự source. Với parallel stream, thứ tự phụ thuộc thread scheduling — không deterministic.

List<Integer> nums = List.of(1, 2, 3, 4, 5);

// Sequential: in theo thu tu 1 2 3 4 5
nums.stream().forEach(System.out::println);

// Parallel: thu tu khong bao gio biet truoc
nums.parallelStream().forEach(System.out::println);
// Co the in: 3 1 4 2 5 hoac 2 4 1 5 3 hoac bat ky thu tu nao

Nếu cần guarantee thứ tự với parallel stream, dùng forEachOrdered:

nums.parallelStream().forEachOrdered(System.out::println);
// In dung thu tu: 1 2 3 4 5

Nhưng forEachOrdered phải serialize kết quả trước khi emit — nó hủy phần lớn lợi ích parallel. Parallel với forEachOrdered thường chậm hơn sequential vì phải sync và đợi thứ tự.

Rule thực tế: nếu cần thứ tự kết quả, dùng collect() thay vì forEachOrdered. collect(Collectors.toList()) với parallel stream gom theo thứ tự encounter order của stream source.

6. Pitfall 3 — ForkJoinPool.commonPool() shared và blocking I/O

Đây là pitfall nguy hiểm nhất ở production. ForkJoinPool.commonPool() là singleton — tất cả parallel stream trong toàn JVM dùng chung. Nếu một parallel stream thực hiện I/O blocking (HTTP call, JDBC query, file read), thread bị block và không thể xử lý task khác.

6.1 Cơ chế Starvation (Đói tài nguyên) toàn JVM

commonPool được chia sẻ cấp độ JVM, khi một đoạn code chạy parallel stream bị nghẽn I/O (ví dụ: đợi API bên thứ ba mất 2 giây hoặc thực thi câu query database nặng), các worker thread của pool chung sẽ bị chiếm dụng hết để chờ đợi.

Hậu quả gián tiếp nghiêm trọng:

  • Toàn bộ các luồng xử lý song song khác trong ứng dụng sẽ bị nghẽn hoàn toàn (Starvation).
  • Các tác vụ bất đồng bộ như @Async của Spring Boot (nếu cấu hình default dùng TaskExecutor mặc định hoặc trỏ về common pool) sẽ bị trì hoãn không thể khởi chạy.
  • Các request của client đi vào các endpoint có sử dụng parallel stream hoặc CompletableFuture (không truyền executor riêng) sẽ chịu độ trễ lớn hoặc bị timeout.
// BAD -- parallel stream goi HTTP cho tung URL
List<String> responses = urls.parallelStream()
    .map(url -> httpGet(url))   // block 200-500ms moi call
    .toList();

// Hau qua: Tat ca thread cua commonPool bi block vao HTTP
// Cac parallel stream khac trong app hoac cac tac vu bat dong bo trong JVM khong con thread de chay!

6.2 Kỹ thuật nâng cao: Cách ly tài nguyên bằng Custom ForkJoinPool

Để giải quyết triệt để vấn đề nghẽn JVM do dùng chung commonPool, Java cho phép một kỹ thuật nâng cao: Bọc parallel stream bên trong một instance ForkJoinPool riêng biệt và submit tác vụ qua pool đó.

Khi một parallel stream được kích hoạt bên trong một task đang chạy trên một ForkJoinPool cụ thể, cơ chế ForkJoin của Java đủ thông minh để tự động điều hướng tất cả các tác vụ con (ForkJoinTask) của stream đó sử dụng chính pool riêng này thay vì rơi về commonPool.

// Boc parallel stream trong ForkJoinPool tuy chinh de cach ly tai nguyen
ForkJoinPool customPool = null;
try {
    // Khoi tao pool rieng biet voi 4 thread danh rieng cho tac vu I/O nay
    customPool = new ForkJoinPool(4);

    List<String> responses = customPool.submit(() ->
        urls.parallelStream()
            .map(url -> httpGet(url)) // Co blocking I/O nhung chi block thread cua customPool
            .toList()
    ).get(); // Block thread hien tai cho den khi hoan thanh

} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Task interrupted", e);
} catch (ExecutionException e) {
    throw new RuntimeException("Execution failed in custom pool", e.getCause());
} finally {
    if (customPool != null) {
        customPool.shutdown(); // Giai phong tai nguyen khi dung xong
    }
}
Lưu ý về thiết kế hệ thống

Cách bọc parallel stream trong customPool chỉ nên dùng khi bạn bắt buộc phải tái sử dụng một thư viện hoặc một đoạn code parallel stream có sẵn mà không thể refactor cấu trúc. Đối với các tác vụ I/O-bound viết mới, CompletableFuture kết hợp với một thread pool riêng biệt (ExecutorService) luôn là lựa chọn hàng đầu và rõ ràng hơn về mặt ngữ nghĩa.

Virtual Threads và parallel stream (Java 21+)

Project Loom (Java 21) giới thiệu Virtual Threads — lightweight thread chạy được hàng nghìn đồng thời mà không block OS thread khi I/O. Tuy nhiên parallel stream chưa tích hợp Virtual Threads tính đến Java 21. parallelStream() vẫn dùng ForkJoinPool với platform threads. Để tận dụng Virtual Threads cho I/O concurrency, dùng Executors.newVirtualThreadPerTaskExecutor() làm executor cho CompletableFuture — không dùng parallel stream.

7. Pitfall 4 — source splittable kém và chi phí phân rã dữ liệu

Parallel stream chia nhỏ dữ liệu nguồn thông qua cơ chế Spliterator.trySplit(). Tốc độ và hiệu quả của việc xử lý song song phụ thuộc trực tiếp vào cấu trúc dữ liệu nguồn có dễ dàng phân chia hay không.

7.1 ArrayList: Phân rã hoàn hảo O(1)

ArrayList hoặc các mảng nguyên thủy (int[], double[]) có cấu trúc lưu trữ liên tục trong bộ nhớ và biết trước kích thước chính xác (SIZED).

  • Chi phí phân rã: O(1). Khi split, Spliterator chỉ cần tính toán chỉ số trung vị toán học đơn giản: mid = (low + high) / 2. Phép chia đôi diễn ra ngay lập tức mà không cần duyệt qua các phần tử.
  • Cache Locality: Cực kỳ tốt. Dữ liệu nằm sát nhau giúp CPU tối ưu hóa bộ đệm L1/L2/L3 khi các luồng truy cập đồng thời.

7.2 LinkedList: Chi phí phân rã tồi tệ O(N)

LinkedList là cấu trúc danh sách liên kết.

  • Chi phí phân rã: O(N). Để chia đôi danh sách, Spliterator không thể nhảy trực tiếp tới phần tử ở giữa. Nó buộc phải duyệt tuần tự từ phần tử đầu tiên (head) qua N / 2 nút để tìm ra điểm chia.
  • Hậu quả: Thời gian phân rã dữ liệu có độ phức tạp thuật toán O(N). Đối với một tập dữ liệu lớn, việc phân tách này tiêu tốn nhiều tài nguyên CPU hơn cả việc chạy tuần tự đơn luồng.
  • Cache Locality: Tệ hại. Các nút của LinkedList nằm rải rác khắp nơi trên Heap, gây ra hiện tượng CPU Cache Miss liên tục khi nhiều thread truy cập đồng thời.

7.3 Files.lines: Phân rã tuần tự O(N)

Files.lines(path) đại diện cho một luồng đọc file theo dòng.

  • Chi phí phân rã: O(N). Hệ thống không thể biết trước một dòng dài bao nhiêu byte hoặc có tổng cộng bao nhiêu dòng trong file mà không đọc qua chúng.
  • Hậu quả: Spliterator buộc phải đọc tuần tự từng dòng để xác định ranh giới trước khi có thể cắt nhỏ và đưa sang thread khác. Việc phân rã song song trên luồng I/O này thực chất bị biến thành một chuỗi các thao tác tuần tự đắt đỏ.

7.4 Stream.iterate / Stream.generate: bản chất tuần tự

Stream.iterate(seed, fn) sinh phần tử tuần tự — phần tử thứ N phụ thuộc phần tử N-1, nên Spliterator không thể "nhảy" tới giữa dãy để chia đôi. Kết quả split rất lệch (một chunk lớn, các chunk nhỏ), thread làm việc không đều. Nếu cần parallel trên dãy số, dùng IntStream.range/LongStream.range — biết trước kích thước, split O(1) như array.

// NOT OPTIMAL -- LinkedList split kem O(N)
List<Order> orders = new LinkedList<>(rawOrders);
long count = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .count();

// BETTER -- ArrayList split O(1)
List<Order> orders = new ArrayList<>(rawOrders);
long count = orders.parallelStream()
    .filter(o -> o.amount() > 100)
    .count();

8. Khi nào parallel thực sự có lợi — checklist

Parallel stream có lợi chỉ khi hội đủ tất cả điều kiện sau:

Điều kiệnVì sao cần
N×Q vượt 10000Overhead fork/join < thời gian tính toán thực
CPU-bound, không I/OI/O-bound block thread pool, hại toàn JVM
Không shared mutable stateRace condition, lock contention
Operation associativeReduce/combine cho kết quả đúng bất kể thứ tự
Source splittable hiệu quả (array-backed, range)Chia đều cho các thread, không lệch — tránh LinkedList, Files.lines, Stream.iterate
commonPool không bậnKhông tranh tài nguyên với task khác trong JVM

Thực tế enterprise, workload thoả hết 6 điều này khá hiếm. Đa số business logic liên quan DB, HTTP, hoặc state shared. Trong những trường hợp đó, sequential stream hoặc CompletableFuture với executor riêng là lựa chọn tốt hơn.

9. Ví dụ: khi parallel thực sự thắng

Bài toán tính số nguyên tố trong 10 triệu số — CPU-bound, stateless, array-backed source:

// Sequential
long seqCount = LongStream.range(2, 10_000_000)
    .filter(PrimeChecker::isPrime)
    .count();

// Parallel -- hop le: N=10M, Q cao (kiem tra so nguyen to),
// source LongStream.range split O(1), khong shared state, CPU-bound
long parCount = LongStream.range(2, 10_000_000)
    .parallel()
    .filter(PrimeChecker::isPrime)
    .count();

Trên máy 8 core, parallel cho speedup xấp xỉ 5-7 lần (không đến 8 lần vì overhead và main thread cũng tính).

// Benchmark mental model:
// isPrime(n): kiem tra chia het cho 2 den sqrt(n) -- O(sqrt(n)) phep tinh
// Voi n ~ 10^7, sqrt(n) ~ 3162 phep tinh -> Q ~ 3000
// N = 10.000.000, Q ~ 3000 -> N*Q ~ 3*10^10 >> 10000 -> parallel rat co loi

10. Tư duy thiết kế hệ thống concurrent: Parallel Stream vs CompletableFuture

Để giúp bạn đưa ra các quyết định thiết kế kiến trúc chuẩn xác khi lập trình đa luồng trong các hệ thống lớn, hãy tham khảo ma trận so sánh trực quan dưới đây:

Tiêu chí So sánhParallel Stream (parallelStream())CompletableFuture + Custom Executor
Bản chất Kiến trúcXử lý dữ liệu song song dạng pipeline (Data-Parallelism). Chia nhỏ một tập dữ liệu lớn để tính toán.Lập trình bất đồng bộ hướng tác vụ (Task-Based Asynchrony). Phối hợp và xâu chuỗi nhiều luồng xử lý độc lập.
Phân bổ Thread PoolMặc định sử dụng ForkJoinPool.commonPool() - là singleton dùng chung toàn bộ JVM.Sử dụng các pool độc lập và tường minh (ThreadPoolExecutor, VirtualThreadPerTaskExecutor).
Loại Workload Phù hợpCPU-bound (Tính toán số học, xử lý ảnh, hash code, parsing lượng lớn dữ liệu offline).I/O-bound (Gọi Web API, truy vấn Database, đọc/ghi file từ đĩa, giao tiếp Network).
Tính Cách ly Tài nguyênCực kém. Một tác vụ lỗi/block có thể làm nghẽn toàn bộ ứng dụng Java (kể cả Spring Boot @Async).Cực tốt. Mỗi nhóm chức năng có thể chạy trên một Executor riêng biệt, cô lập lỗi hoàn toàn.
Khả năng Điều phốiTuyến tính, đơn giản. Chỉ hỗ trợ chia nhỏ dữ liệu rồi gộp lại thông qua combiners.Cực mạnh. Hỗ trợ xâu chuỗi phức tạp (thenCompose), gom kết quả (allOf, anyOf), fallback lồng nhau.
Xử lý TimeoutKhông hỗ trợ trực tiếp trên từng phần tử.Hỗ trợ cấu hình timeout linh hoạt (orTimeout, completeOnTimeout).
Độ phức tạp Mã nguồnCực kỳ đơn giản. Chỉ cần thêm .parallel() vào stream pipeline sẵn có.Trung bình - Cao. Đòi hỏi thiết kế vòng đời thread pool, handle exception bất đồng bộ.

Quy tắc chọn:

  • CPU-bound, stateless, array-backed, không cần composition → parallelStream().
  • I/O-bound, cần timeout, cần error handling per-task, cần Virtual Threads → CompletableFuture + executor.

11. Deep Dive

  • "When to Use Parallel Streams" — Doug Leaweb.archive.org/web/20160331125154/http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html — bài gốc của tác giả ForkJoinPool, giải thích công thức N×Q, phân tích chi tiết khi nào parallel thắng. Bắt buộc đọc trước khi dùng parallel stream ở production.
  • ForkJoinPool Javadoc SE 21docs.oracle.com/.../ForkJoinPool.html — giải thích work-stealing, commonPool(), cấu hình parallelism, và hành vi khi task blocking.
  • Spliterator Javadoc SE 21docs.oracle.com/.../Spliterator.html — cơ chế chia data cho parallel, các Characteristics (SIZED, ORDERED, SUBSIZED...) ảnh hưởng đến hiệu quả split.
  • JEP 446 — Scoped Values (Java 21)openjdk.org/jeps/446 — cơ chế chia sẻ dữ liệu read-only an toàn giữa thread (thay thế ThreadLocal trong concurrent context), liên quan khi parallel stream cần chia config/context qua thread.
  • Project Loom — Virtual Threads (JEP 444, Java 21)openjdk.org/jeps/444 — giải thích vì sao parallel stream chưa tích hợp Virtual Threads, và hướng đúng cho I/O concurrency trong Java 21+.

12. Self-check

Tự kiểm tra
Q1
Dev thêm `.parallel()` vào stream xử lý 500 record, mỗi record gọi 1 hàm tính toán đơn giản (tổng vài phép nhân). Performance tệ hơn 40%. Giải thích nguyên nhân và quyết định dùng sequential hay parallel.
Nguyên nhân: overhead fork/join lớn hơn "công sức" thực tế. Với N=500 record và Q thấp (vài phép nhân, Q ~ 5-10), N×Q ~ 2500-5000 — dưới ngưỡng 10000 của Doug Lea. Khi overhead split data, fork task vào ForkJoinPool, context switch, và combine kết quả tốn 100-200 microsecond, mà tổng công việc thực chỉ vài microsecond → parallel tốn thêm nhiều hơn tiết kiệm. Quyết định: dùng sequential stream. Chỉ cân nhắc parallel khi tăng N (vượt 100000) hoặc khi phép tính mỗi record nặng hơn (parsing phức tạp, hash computation, regex). Đo bằng JMH với data thực tế trước khi quyết định cuối cùng.
Q2
Đoạn code sau có bug không? Nếu có, bug gì, và sửa thế nào? List<String> results = new ArrayList<>(); orders.parallelStream().map(Order::id).forEach(results::add);
Có bug nghiêm trọng. `ArrayList` không thread-safe. `forEach` với parallel stream chạy `results.add()` từ nhiều thread đồng thời. `ArrayList` dùng `elementData` array — khi nhiều thread gọi `add` cùng lúc, có thể: (1) ghi đè nhau cùng index, (2) bị `ArrayIndexOutOfBoundsException` khi một thread đang resize mà thread khác ghi, (3) kết quả list có phần tử `null` ở giữa. Không có exception rõ ràng — bug im lặng với kết quả sai. Sửa đúng: List<String> results = orders.parallelStream().map(Order::id).collect(Collectors.toList()); Collector đảm bảo mỗi thread có container riêng, combiner gộp thread-safe cuối cùng.
Q3
`ForkJoinPool.commonPool()` có bao nhiêu thread? Vì sao đây là nguy hiểm với I/O-bound parallel stream?
Mặc định `commonPool()` có `Runtime.getRuntime().availableProcessors() - 1` thread — máy 8 core có 7 thread. Nguy hiểm với I/O-bound vì: commonPool là singleton shared toàn JVM. Khi parallel stream thực hiện HTTP call blocking (200-500ms mỗi call), các thread trong pool bị block và không thể xử lý task khác. Nếu 7 thread đều block vào HTTP, mọi parallel stream khác, mọi `CompletableFuture.supplyAsync()` không có executor riêng đều phải chờ. Toàn bộ JVM bị throttle chỉ vì 1 parallel stream I/O-bound. Giải pháp: I/O concurrency phải dùng executor riêng — `CompletableFuture.supplyAsync(task, customExecutor)` hoặc Virtual Thread executor trên Java 21+.
Q4
`parallelStream().forEachOrdered(consumer)` có đảm bảo thứ tự không? Tại sao performance tệ hơn sequential?
`forEachOrdered` có đảm bảo thứ tự encounter — phần tử được emit theo thứ tự nguồn, không phải thứ tự thread hoàn thành. Nhưng để làm được điều này với parallel stream, JVM phải: (1) xử lý song song để có kết quả từ nhiều chunk, (2) buffer kết quả đã xong nhưng chưa đến lượt, (3) serialize emit theo thứ tự. Bước serialize này tạo ra bottleneck — các thread xong sớm phải chờ thread phía trước emit xong mới được tiếp tục. Overhead coordination cao hơn nhiều so với sequential. Kết quả: thường chậm hơn sequential 20-50%. Nếu cần thứ tự và parallel, dùng `collect(Collectors.toList())` thay — collector gom song song nhưng giữ thứ tự trong list result mà không serialize từng emit.
Q5
Vì sao `LinkedList` là nguồn kém cho parallel stream, còn `ArrayList` lại tốt?
`Spliterator.trySplit()` cần tìm midpoint để chia đôi data cho 2 thread. `ArrayList` (và array) biết size và có random access O(1) — tìm midpoint tức thì bằng (left + right) / 2. `LinkedList` là doubly-linked list — để tìm phần tử ở giữa phải traverse từ head O(n/2). Với N=1 triệu element, split `LinkedList` tốn 500000 bước — bản thân split đã đắt hơn nhiều công việc thực. Thêm nữa, split không đều vì overhead — một số chunk có nhiều element hơn, thread không cân bằng. Hậu quả: parallel stream trên `LinkedList` thường chậm hơn sequential. Nếu đang dùng `LinkedList`, convert sang `ArrayList` trước khi parallel: new ArrayList<>(linkedList).parallelStream().
Q6
Cho đoạn code: orders.parallelStream().filter(o -> o.amount() > 100).sorted().toList() — `sorted()` trong pipeline parallel ảnh hưởng performance thế nào?
`sorted()` là stateful intermediate operation — phải buffer toàn bộ phần tử đã filter rồi mới sort. Trong parallel stream, bản thân bước sort **vẫn chạy song song**: JDK sort mảng buffer bằng parallel merge sort (cùng cơ chế với `Arrays.parallelSort`). Bottleneck thực sự là **barrier đồng bộ hoá**: mọi thread phải hoàn tất `filter` và đổ kết quả về một mảng buffer chung trước khi sort bắt đầu — pipeline mất tính streaming tại điểm này, cộng thêm chi phí copy dữ liệu vào/ra buffer và merge các đoạn đã sort. Nếu mục tiêu là sort kết quả cuối, một lựa chọn thường gọn hơn: (1) filter song song, (2) `collect` vào list, (3) sort list bằng `list.sort(comparator)` — tách rõ giai đoạn và dễ đo lường. Hoặc chấp nhận `sorted()` trong pipeline nhưng hiểu rằng nó là điểm chặn buffer toàn bộ dữ liệu.

Bài tiếp theo: Immutability và Functional Style — thiết kế Java không có surprise

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên