Java — Từ Zero đến Senior/Stream nâng cao — flatMap, takeWhile, parallel caveat
~22 phútStream API & LambdaMiễn phí

Stream nâng cao — flatMap, takeWhile, parallel caveat

flatMap cho stream lồng, takeWhile/dropWhile (Java 9+), peek debug, và parallel stream — khi nào nhanh hơn, khi nào chậm hơn, và vì sao common pool là bẫy.

filter, map, reduce giải quyết 80% task aggregate. 20% còn lại cần công cụ chuyên biệt:

  • List<User> mà mỗi user có List<Order> — muốn lấy tất cả order flatten → flatMap.
  • Log file sorted theo thời gian, muốn dừng khi gặp dòng SHUTDOWN → takeWhile.
  • Pipeline dài debug không biết element nào qua được stage nào → peek.
  • Data 10 triệu element, muốn tính song song → parallel() — và đây là nơi bẫy lớn nhất của Stream API.

Bài này đi sâu 4 chủ đề trên. Đặc biệt parallel — tưởng "thêm .parallel() là x8 lần nhanh" là ngộ nhận lớn. Phần cuối bài sẽ giải thích tại sao parallel stream thường chậm hơn sequential, và quy tắc Doug Lea (tác giả ForkJoinPool) để quyết định có nên parallel hay không.

1. flatMap — phẳng hoá stream lồng

Vấn đề

List<User>, mỗi user có List<String> emails. Muốn lấy danh sách tất cả email, không lồng:

record User(String name, List<String> emails) { }

List<User> users = List.of(
    new User("Alice", List.of("[email protected]", "[email protected]")),
    new User("Bob", List.of("[email protected]"))
);

Dùng map:

List<List<String>> nested = users.stream()
    .map(User::emails)
    .toList();
// [[[email protected], [email protected]], [[email protected]]]

Sai rồi. Ta được List lồng — muốn flat list [[email protected], [email protected], [email protected]].

Giải pháp: flatMap

List<String> allEmails = users.stream()
    .flatMap(u -> u.emails().stream())
    .toList();
// [[email protected], [email protected], [email protected]]

flatMap(Function<T, Stream<R>>): function trả stream (không phải value), và flatMap flatten các stream đó thành 1 stream R.

So sánh map vs flatMap

OpCardinalityFunction signature
map1 → 1T → R
flatMap1 → 0, 1, hay NT → Stream<R>

map bảo toàn số element. flatMap có thể mở rộng hoặc thu nhỏ stream — 1 user có thể sinh 0, 1, hay 5 email đều được.

Ví dụ thực tế

Parse CSV nhiều dòng:

List<String> lines = Files.readAllLines(Path.of("data.csv"));

List<String> allCells = lines.stream()
    .flatMap(line -> Arrays.stream(line.split(",")))
    .map(String::trim)
    .toList();
// ["name", "age", "Alice", "30", "Bob", "25"]

Mỗi line split ra nhiều cell → flatMap gộp tất cả cell từ mọi line vào 1 stream.

Flatten nested option:

Stream<Optional<String>> s = Stream.of(
    Optional.of("a"),
    Optional.empty(),
    Optional.of("b")
);

List<String> present = s.flatMap(Optional::stream).toList();
// ["a", "b"]

Optional::stream trả Stream<T> với 0 element (empty) hoặc 1 element (present). flatMap flatten — tự động bỏ empty.

Nested loop imperative → flatMap:

// Imperative
List<Pair<User, Order>> pairs = new ArrayList<>();
for (User u : users) {
    for (Order o : u.orders()) {
        pairs.add(new Pair<>(u, o));
    }
}

// Stream
List<Pair<User, Order>> pairs = users.stream()
    .flatMap(u -> u.orders().stream().map(o -> new Pair<>(u, o)))
    .toList();

flatMap thay thế nested loop rất tự nhiên.

💡 Quy tắc chọn map vs flatMap

Nếu function biến đổi sinh ra Stream<X>, List<X>, hay Optional<X> → dùng flatMap để flatten 1 tầng. Nếu function sinh giá trị đơn thuần → map.

2. takeWhiledropWhile (Java 9+)

filter vs takeWhile

filter(predicate): duyệt hết stream, giữ element true. Mỗi element check độc lập.

takeWhile(predicate): dừng lần đầu predicate false. Giữ prefix match.

List<Integer> nums = List.of(1, 2, 3, 4, 5, 1, 2);

nums.stream().filter(n -> n < 4).toList();
// [1, 2, 3, 1, 2] - lay moi element < 4

nums.stream().takeWhile(n -> n < 4).toList();
// [1, 2, 3] - dung lan dau n = 4, bo cac element sau

Chú ý: element 5, 1, 2 cuối list có phần tử pass n < 4 (là 1 và 2), nhưng takeWhile đã dừng từ element 4 → không quay lại xét.

dropWhile

Skip prefix match predicate, lấy phần còn lại:

nums.stream().dropWhile(n -> n < 4).toList();
// [4, 5, 1, 2]

Skip 1, 2, 3 (match). Element 4 không match → lấy nó và mọi element sau, không filter.

Khi nào dùng?

takeWhile/dropWhile phù hợp khi data có cấu trúc prefix — sorted hoặc có ranh giới logic.

Ví dụ:

  • Log file sorted theo thời gian: lấy event trước thời điểm X.
    events.stream()
        .takeWhile(e -> e.timestamp().isBefore(cutoff))
        .toList();
    
  • Parse header đến dòng rỗng: header block kết thúc bằng empty line.
    lines.stream()
        .takeWhile(l -> !l.isEmpty())
        .toList();
    
  • Bỏ comment đầu file: skip các dòng bắt đầu bằng #, lấy content thật.
    lines.stream()
        .dropWhile(l -> l.startsWith("#"))
        .toList();
    
⚠️ Ordered stream

takeWhile/dropWhile chỉ deterministic với ordered stream. Với parallel unordered, "prefix" không có ý nghĩa cố định — kết quả có thể thay đổi giữa các lần chạy. Check spliterator().hasCharacteristics(Spliterator.ORDERED) nếu nghi.

3. peek — debug mid-pipeline

Intermediate op cho phép "nhìn vào" element mà không thay đổi — dùng debug pipeline dài:

List<Integer> result = Stream.of(1, 2, 3, 4)
    .peek(n -> System.out.println("before filter: " + n))
    .filter(n -> n > 2)
    .peek(n -> System.out.println("after filter: " + n))
    .map(n -> n * 10)
    .peek(n -> System.out.println("after map: " + n))
    .toList();

Output:

before filter: 1
before filter: 2
before filter: 3
after filter: 3
after map: 30
before filter: 4
after filter: 4
after map: 40

Thấy rõ:

  • Element 1, 2 bị filter loại → không gọi map.
  • Element đi element-by-element, không batch.

Đừng dùng peek cho logic chính

// BAD
List<Integer> list = ...;
stream.peek(list::add).count();

Lý do: peek không được spec đảm bảo khi nào gọi. Java 9+ tối ưu: nếu terminal op là count() trên stream SIZED (biết size từ source) và pipeline không thay đổi size (peek, map), JVM bỏ qua duyệt — trả thẳng size. peek không chạy.

Pattern đúng: side-effect ở terminal op forEach:

stream.forEach(list::add);
// hoac
list.addAll(stream.toList());

peek chỉ cho quan sát debug, không phải cho side-effect logic.

4. sorted, distinct, limit, skip

sorted

Stream.of(3, 1, 4, 1, 5, 9, 2, 6).sorted().toList();
// [1, 1, 2, 3, 4, 5, 6, 9]

// Voi comparator custom
Stream.of("banana", "apple", "cherry")
    .sorted(Comparator.comparingInt(String::length))
    .toList();
// ["apple", "banana", "cherry"]

sortedstateful — phải buffer toàn bộ element để sort. Hệ quả:

  • Không dùng với stream vô hạn (iterate, generate không limit).
  • Memory tăng với size (tệ với stream lớn).
  • Thường xuất hiện cuối pipeline (sort result cuối, không phải mỗi bước).

distinct

Stream.of(1, 2, 2, 3, 3, 3).distinct().toList();
// [1, 2, 3]

Dùng equals + hashCode để loại trùng. Giữ HashSet bên dưới → memory tăng với số distinct element.

Stream vô hạn với ít distinct value (vd random dice 1-6) → distinct hoạt động nhưng cần short-circuit (limit) để terminate.

limitskip

Stream.iterate(1, i -> i * 2)   // infinite: 1, 2, 4, 8, ...
    .skip(3)                    // bo 3 dau: bo 1, 2, 4
    .limit(3)                   // lay 3 element ke
    .toList();
// [8, 16, 32]

limit cũng short-circuit terminal — dừng upstream khi đủ. Essential cho stream vô hạn.

skip không short-circuit — duyệt và bỏ n element đầu.

5. Parallel stream — sức hút và cạm bẫy

Cách bật parallel

list.parallelStream()
    .filter(...)
    .map(...)
    .toList();

// Hoac
list.stream().parallel()
    .filter(...)
    .toList();

Thêm parallel() hoặc parallelStream() — Java chia data thành chunk, chạy pipeline song song trên nhiều thread, combine kết quả.

Under the hood: dùng ForkJoinPool.commonPool() — pool shared toàn JVM với size mặc định = số CPU core.

Bẫy 1: Common pool shared toàn JVM

commonPoolsingleton — mọi parallel stream, CompletableFuture.supplyAsync không pass executor, ForkJoin task trực tiếp đều dùng chung.

Hệ quả: nếu parallel stream block I/O (HTTP call, DB query), tất cả task khác dùng commonPool bị block. App tưởng đang chạy nhiều việc thực tế kẹt cứng.

// BAD: parallel stream goi HTTP
urls.parallelStream()
    .map(url -> httpGet(url))   // Block 500ms moi cai
    .toList();
// Cac parallel stream khac trong app cung cham

Fix: dùng CompletableFuture với executor riêng (bài 10.3) — không phải parallel stream.

Bẫy 2: Overhead > công sức với data nhỏ

Parallel có chi phí:

  • Chia data thành chunk (split qua Spliterator).
  • Fork task vào pool.
  • Context switch thread.
  • Combine kết quả cuối.

Với data nhỏ hoặc op rẻ, overhead lớn hơn công sức làm thật.

Formula Doug Lea: parallel đáng khi N * Q > 10_000, trong đó:

  • N = số element.
  • Q = cost mỗi element (đơn vị: number of operation, không phải ns).

Ví dụ:

  • N = 1000, Q = 10 → N*Q = 10k → ở ranh giới, không chắc parallel thắng.
  • N = 1_000_000, Q = 1000 → N*Q = 10^9 → parallel thắng rõ.
  • N = 100, Q = 10 → 1000 → sequential chắc chắn nhanh hơn.

Bẫy 3: Không associative / có shared state

// BAD: shared list
List<Integer> result = new ArrayList<>();
data.parallelStream().forEach(result::add);
// Race condition, ArrayList corrupt, co the ConcurrentModificationException

Fix: dùng collect để gom — mỗi thread có container riêng, combine cuối:

List<Integer> result = data.parallelStream().collect(Collectors.toList());

Hàm reduce phải associative (bài 9.3). Nếu không, kết quả parallel không deterministic.

Bẫy 4: Source split không hiệu quả

Spliterator (bên dưới stream) phải chia data thành chunk bằng nhau. Các source chia tốt:

  • ArrayList, array → O(1) split.
  • IntStream.range → O(1).
  • HashMap.values() → OK.

Chia tệ:

  • LinkedList → O(n) đi từ head đến giữa.
  • Files.lines → không biết size trước.
  • Stream.iterate — sinh tuần tự, không split được hiệu quả.

Parallel với source split tệ → overhead cao, thường chậm hơn sequential.

Khi nào parallel thực sự thắng?

Hội đủ điều kiện:

  1. N*Q > 10_000 (data đủ lớn, op đủ nặng).
  2. CPU-bound workload (không I/O, không blocking).
  3. Không shared state / side-effect.
  4. Hàm associative + stateless.
  5. Source splittable hiệu quả (array-backed).
  6. Không dùng common pool cho task khác trong cùng app.

Thực tế, workload thoả hết 6 điều hiếm — đa số production code dùng sequential hoặc CompletableFuture với executor riêng.

⚠️ Đo trước khi parallel

Không tin vào trực giác "parallel chắc nhanh hơn". Dùng JMH (Java Microbenchmark Harness) đo thực tế với data production-like. Nhiều case, sequential nhanh hơn parallel 2-3 lần.

6. Pattern thực tế — log analysis

// Dem so line ERROR trong 1 trieu dong log
try (Stream<String> lines = Files.lines(Path.of("app.log"))) {
    long errorCount = lines
        .filter(l -> l.contains("ERROR"))
        .count();
    System.out.println("Errors: " + errorCount);
}

Sequential đủFiles.lines không split hiệu quả, I/O-bound. Parallel sẽ không nhanh hơn.

// Lay tat ca tag cua tat ca post (nested)
record Post(List<String> tags) { }

Set<String> allTags = posts.stream()
    .flatMap(p -> p.tags().stream())
    .collect(Collectors.toSet());

flatMap + toSet thay nested loop.

// Lay cac event truoc khi gap SHUTDOWN
List<Event> beforeShutdown = events.stream()
    .takeWhile(e -> !e.type().equals("SHUTDOWN"))
    .toList();

7. Pitfall tổng hợp

Nhầm 1: map khi cần flatMap.

List<List<String>> roles = users.stream()
    .map(User::roles)
    .toList();   // List long, khong flatten

flatMap(u -> u.roles().stream()).

Nhầm 2: takeWhile trên data không có prefix structure.

nums.stream().takeWhile(n -> n > 0)
// Nums = [1, -1, 2, 3] -> chi lay [1], bo [2, 3]

✅ Dùng filter cho lọc unconditional, takeWhile cho prefix có structure.

Nhầm 3: Parallel với shared state mutation.

list.parallelStream().forEach(myList::add);   // Race

list.parallelStream().collect(Collectors.toList()).

Nhầm 4: Parallel stream với blocking I/O.

urls.parallelStream().map(this::httpGet).toList();
// Block common pool - app ca system cham

CompletableFuture.supplyAsync với executor riêng (bài 10.3).

Nhầm 5: sorted trên stream vô hạn.

Stream.iterate(0, i -> i + 1).sorted().limit(10);   // Hang forever

sorted cần thấy hết element. Limit TRƯỚC sorted vẫn không cứu với infinite source — không có "hết" để sort.

Nhầm 6: peek cho logic side-effect.

stream.peek(this::save).count();   // save co the khong chay (Java 9+ optimize)

stream.forEach(this::save).

8. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Doug Lea giải thích formula NQ và phân tích khi nào parallel thắng. Đọc kỹ để có tư duy đúng về performance — nhiều dev ngộ nhận "parallel chắc nhanh hơn". Benchmark Stream của Aleksey Shipilev (JIT engineer Oracle) còn chỉ ra parallel overhead cụ thể ~100μs cho mỗi pipeline setup — rất đáng kể với workload ngắn.

9. Tóm tắt

  • flatMap(Function<T, Stream<R>>) — flatten stream lồng, hoặc 1-to-N transform. Thay thế nested loop imperative.
  • takeWhile/dropWhile (Java 9+) — stop/skip prefix theo predicate. Chỉ deterministic với ordered stream. Dùng cho data sorted hoặc có ranh giới logic.
  • peek — intermediate op cho debug/observe. Không cho logic side-effect — Java 9+ có thể skip peek khi optimizer bỏ qua duyệt.
  • sorted, distinct stateful — buffer element. Không dùng với stream vô hạn.
  • Parallel stream dùng ForkJoinPool.commonPool shared toàn JVM — block I/O trong parallel stream block toàn app.
  • Formula Doug Lea: parallel đáng khi N*Q > 10_000. Với data nhỏ hoặc op rẻ, sequential nhanh hơn.
  • Parallel yêu cầu: không shared state, hàm associative, source splittable hiệu quả (array-backed), CPU-bound không I/O.
  • Không parallel stream cho I/O — dùng CompletableFuture + executor riêng.
  • Đo trước khi parallel. Trực giác sai thường xuyên.

10. Tự kiểm tra

Tự kiểm tra
Q1
Khi nào dùng map vs flatMap?
  • map: 1 input → 1 output. Stream<String>Stream<Integer>. Cardinality không đổi.
  • flatMap: 1 input → 0, 1, hay N output. Stream<User>Stream<Email> (mỗi user N email). Stream kết quả flatten 1 tầng.

Quy tắc: function bạn truyền sinh ra Stream<X>, List<X>, hay Optional<X> và bạn muốn "ra" là X không phải container → flatMap. Function sinh giá trị đơn lẻ → map.

Dấu hiệu rõ: nếu dùng map mà kết quả là Stream<Stream<X>> hay Stream<List<X>>, chắc chắn cần đổi sang flatMap.

Q2
Đoạn sau cho kết quả gì? Stream.of(1, 5, 2, 8, 3).takeWhile(n -> n < 5).toList()

takeWhile lấy prefix thoả predicate, dừng lần đầu predicate false.

Element 1: 1 < 5 true → giữ. Element 5: 5 < 5 false → dừng. Các element 2, 8, 3 bị bỏ dù 2 và 3 pass predicate.

[1]

Phân biệt với filter(n -> n < 5): cái đó duyệt hết, cho [1, 2, 3].

takeWhile khác filter ở chỗ: check từng element theo thứ tự, gặp false thì dừng — không xét element sau. Phù hợp với data có prefix structure (sorted, grouped).

Q3
Vì sao parallel stream thường KHÔNG nhanh hơn sequential?

4 lý do chính:

  1. Overhead fork/join: chia data, fork task, context switch, combine kết quả — tốn CPU. Với data nhỏ hoặc op rẻ, overhead lớn hơn công sức làm thật. Formula Doug Lea: N*Q > 10k mới đáng.
  2. Common pool shared: ForkJoinPool.commonPool size = #core, dùng chung toàn JVM. Nếu I/O-bound → block pool → các parallel task khác chờ.
  3. Source split kém: LinkedList, Files.lines, Stream.iterate không split O(1) → thread làm việc không đều, overhead cao.
  4. Shared state: code ghi vào ArrayList chung từ parallel → race condition, phải thêm sync → contention → chậm hơn sequential.

Kết luận: parallel chỉ đáng khi CPU-bound, data lớn, source array-backed, không shared state, không dùng common pool cho I/O ở nơi khác. Đo bằng JMH trước khi quyết định.

Q4
Khi nào peek có thể bị JVM skip, và vì sao đừng dùng peek cho logic chính?

Java 9+ có optimization: nếu terminal op không cần duyệt từng element để tính kết quả (vd count() trên stream SIZED, nơi size biết từ source), JVM có thể bỏ qua duyệt — trả thẳng size. Intermediate op `peek`, `map` không thay đổi size cũng bị skip.

Ví dụ:

list.stream()
  .peek(item -> save(item))   // Co the KHONG chay
  .count();

Size của list JVM biết. Map/peek không đổi size. count() trả list.size() mà không duyệt. Hàm save không chạy.

Đây là bug silent — không có warning, test pass trên Java 8, fail trên Java 9+.

Rule: peek chỉ cho debug/log. Logic side-effect phải ở terminal op forEach:

list.stream().forEach(item -> save(item));
Q5
Vì sao sorted không dùng được với stream vô hạn?

sorted là stateful intermediate op — phải thấy toàn bộ element để sort đúng. Nó buffer element vào array tạm, sort, rồi emit theo thứ tự đã sort.

Stream vô hạn: "toàn bộ" không tồn tại. Buffer grows unbounded → OOM hoặc hang forever.

Ngay cả pattern .limit(10).sorted() — đảo thứ tự:

  • Stream.iterate(0, i -> i + 1).limit(10).sorted() — OK. Limit TRƯỚC biến vô hạn thành hữu hạn 10 element, sorted OK.
  • Stream.iterate(0, i -> i + 1).sorted().limit(10) — HANG. Sorted cần thấy hết vô hạn → không bao giờ complete.

Tương tự distinct — buffer set element đã thấy. Stream với nhiều distinct value + vô hạn → memory grows unbounded.

Rule: stateful op (sorted, distinct) + stream vô hạn là bug. Limit trước chúng.

Bài tiếp theo: Optional — xử lý null không vỡ

Bài này có giúp bạn hiểu bản chất không?