Java OO & Functional/Stream nâng cao — flatMap, takeWhile và peek
31/36
Bài 31 / 36~14 phútStream API & LambdaMiễn phí lượt xem

Stream nâng cao — flatMap, takeWhile và peek

flatMap cho stream lồng (và lựa chọn mapMulti từ Java 16), takeWhile/dropWhile (Java 9+), peek debug, cùng các op stateful sorted/distinct/limit/skip — bộ công cụ cho 20% task còn lại sau map/filter/reduce.

TL;DR: Ngoài bộ ba filter/map/reduce, bốn nhóm công cụ chuyên dụng giải quyết 20% task còn lại: flatMap phẳng hoá stream lồng (1 phần tử cha sinh 0..N phần tử con — chú ý chi phí tạo stream con per-element, có mapMulti Java 16+ làm alternative); takeWhile/dropWhile (Java 9+) cắt prefix theo predicate — chỉ deterministic trên ordered stream; peek quan sát mid-pipeline cho debug, không dùng cho logic chính vì Java 9+ có thể skip nó; và nhóm stateful sorted/distinct phải buffer element — cấm dùng trên stream vô hạn nếu không có limit đứng trước. Parallel stream — bẫy lớn nhất của Stream API — có bài riêng.

filter, map, reduce giải quyết 80% task aggregate. 20% còn lại cần công cụ chuyên biệt:

  • List<User> mà mỗi user có List<Order> — muốn lấy tất cả order flatten → flatMap.
  • Log file sorted theo thời gian, muốn dừng khi gặp dòng SHUTDOWN → takeWhile.
  • Pipeline dài debug không biết element nào qua được stage nào → peek.

Bài này đi sâu 3 nhóm công cụ trên cùng các op stateful (sorted, distinct, limit, skip). Còn parallel() — nơi nhiều bẫy nhất của Stream API — có bài riêng.

1. flatMap — phẳng hoá stream lồng

Vấn đề

List<User>, mỗi user có List<String> emails. Muốn lấy danh sách tất cả email, không lồng:

record User(String name, List<String> emails) { }

List<User> users = List.of(
    new User("Alice", List.of("[email protected]", "[email protected]")),
    new User("Bob", List.of("[email protected]"))
);

Dùng map:

List<List<String>> nested = users.stream()
    .map(User::emails)
    .toList();
// [[[email protected], [email protected]], [[email protected]]]

Sai rồi. Ta được List lồng — muốn flat list [[email protected], [email protected], [email protected]].

Giải pháp: flatMap

List<String> allEmails = users.stream()
    .flatMap(u -> u.emails().stream())
    .toList();
// [[email protected], [email protected], [email protected]]

flatMap(Function<T, Stream<R>>): function trả stream (không phải value), và flatMap flatten các stream đó thành 1 stream R.

So sánh map vs flatMap

OpCardinalityFunction signature
map1 → 1T → R
flatMap1 → 0, 1, hay NT → Stream<R>

map bảo toàn số element. flatMap có thể mở rộng hoặc thu nhỏ stream — 1 user có thể sinh 0, 1, hay 5 email đều được.

Ví dụ thực tế

Parse CSV nhiều dòng:

List<String> lines = Files.readAllLines(Path.of("data.csv"));

List<String> allCells = lines.stream()
    .flatMap(line -> Arrays.stream(line.split(",")))
    .map(String::trim)
    .toList();
// ["name", "age", "Alice", "30", "Bob", "25"]

Mỗi line split ra nhiều cell → flatMap gộp tất cả cell từ mọi line vào 1 stream.

Flatten nested option:

Stream<Optional<String>> s = Stream.of(
    Optional.of("a"),
    Optional.empty(),
    Optional.of("b")
);

List<String> present = s.flatMap(Optional::stream).toList();
// ["a", "b"]

Optional::stream trả Stream<T> với 0 element (empty) hoặc 1 element (present). flatMap flatten — tự động bỏ empty.

Nested loop imperative → flatMap:

// Imperative
List<Pair<User, Order>> pairs = new ArrayList<>();
for (User u : users) {
    for (Order o : u.orders()) {
        pairs.add(new Pair<>(u, o));
    }
}

// Stream
List<Pair<User, Order>> pairs = users.stream()
    .flatMap(u -> u.orders().stream().map(o -> new Pair<>(u, o)))
    .toList();

flatMap thay thế nested loop rất tự nhiên.

Chi phí của flatMap — và lựa chọn mapMulti (Java 16+)

flatMap nhận Function<T, Stream<R>> — nghĩa là mỗi phần tử cha tạo một object Stream con mới trên heap (kèm spliterator đi cùng). Với collection lớn (hàng trăm nghìn phần tử cha), lượng allocation per-element này cộng dồn thành áp lực GC đo được trong hot-path.

Java 16 thêm Stream.mapMulti(BiConsumer<T, Consumer<R>>) cho đúng tình huống này: thay vì tạo stream con, bạn push từng phần tử kết quả vào consumer — không có stream trung gian nào được cấp phát.

// flatMap: tao 1 Stream con moi cho moi user
List<String> a = users.stream()
    .flatMap(u -> u.emails().stream())
    .toList();

// mapMulti (Java 16+): khong tao stream trung gian
List<String> b = users.stream()
    .<String>mapMulti((u, consumer) -> u.emails().forEach(consumer))
    .toList();

Quy tắc thực dụng: mặc định cứ dùng flatMap — dễ đọc, đúng ngữ nghĩa, và với đa số workload chi phí allocation không đáng kể. Chỉ khi profiler chỉ ra flatMap là điểm nóng (hot-path xử lý liên tục, mỗi phần tử cha chỉ sinh 0-1 phần tử con) mới cân nhắc mapMulti hoặc loop thường.

💡 Quy tắc chọn map vs flatMap

Nếu function biến đổi sinh ra Stream<X>, List<X>, hay Optional<X> → dùng flatMap để flatten 1 tầng. Nếu function sinh giá trị đơn thuần → map.

2. takeWhiledropWhile (Java 9+)

filter vs takeWhile

filter(predicate): duyệt hết stream, giữ element true. Mỗi element check độc lập.

takeWhile(predicate): dừng lần đầu predicate false. Giữ prefix match.

List<Integer> nums = List.of(1, 2, 3, 4, 5, 1, 2);

nums.stream().filter(n -> n < 4).toList();
// [1, 2, 3, 1, 2] - lay moi element < 4

nums.stream().takeWhile(n -> n < 4).toList();
// [1, 2, 3] - dung lan dau n = 4, bo cac element sau

Chú ý: element 5, 1, 2 cuối list có phần tử pass n < 4 (là 1 và 2), nhưng takeWhile đã dừng từ element 4 → không quay lại xét.

dropWhile

Skip prefix match predicate, lấy phần còn lại:

nums.stream().dropWhile(n -> n < 4).toList();
// [4, 5, 1, 2]

Skip 1, 2, 3 (match). Element 4 không match → lấy nó và mọi element sau, không filter.

Khi nào dùng?

takeWhile/dropWhile phù hợp khi data có cấu trúc prefix — sorted hoặc có ranh giới logic.

Ví dụ:

  • Log file sorted theo thời gian: lấy event trước thời điểm X.
    events.stream()
        .takeWhile(e -> e.timestamp().isBefore(cutoff))
        .toList();
    
  • Parse header đến dòng rỗng: header block kết thúc bằng empty line.
    lines.stream()
        .takeWhile(l -> !l.isEmpty())
        .toList();
    
  • Bỏ comment đầu file: skip các dòng bắt đầu bằng #, lấy content thật.
    lines.stream()
        .dropWhile(l -> l.startsWith("#"))
        .toList();
    
⚠️ Độ chính xác trên Unordered Stream

takeWhile/dropWhile chỉ đảm bảo tính chính xác và nhất quán (deterministic) khi làm việc với ordered stream (stream đã được sắp xếp hoặc giữ nguyên thứ tự nguồn).

Nếu stream không được sắp xếp (unordered stream), runtime có thể dừng duyệt sớm một cách ngẫu nhiên vì thứ tự các phần tử không được định nghĩa rõ ràng. Điều này dẫn đến kết quả trả về không chính xác, mất dữ liệu không mong muốn và hành vi không nhất quán giữa các lần chạy. Hãy kiểm tra spliterator().hasCharacteristics(Spliterator.ORDERED) nếu không chắc chắn về cấu trúc nguồn dữ liệu của bạn trước khi dùng các thao tác ngắn mạch này.

3. peek — debug mid-pipeline

Intermediate op cho phép "nhìn vào" element mà không thay đổi — dùng debug pipeline dài:

List<Integer> result = Stream.of(1, 2, 3, 4)
    .peek(n -> System.out.println("before filter: " + n))
    .filter(n -> n > 2)
    .peek(n -> System.out.println("after filter: " + n))
    .map(n -> n * 10)
    .peek(n -> System.out.println("after map: " + n))
    .toList();

Output:

before filter: 1
before filter: 2
before filter: 3
after filter: 3
after map: 30
before filter: 4
after filter: 4
after map: 40

Thấy rõ:

  • Element 1, 2 bị filter loại → không gọi map.
  • Element đi element-by-element, không batch.

Đừng dùng peek cho logic chính

⚠️ peek không dành cho logic chính

Chỉ dùng peek cho mục đích debug (ghi nhận thông tin/in log). Không dùng peek để thay đổi trạng thái đối tượng hoặc tương tác với cơ sở dữ liệu.

Lý do: từ Java 9+, runtime có thể tối ưu hóa và bỏ qua peek hoàn toàn nếu terminal operation không yêu cầu duyệt qua các phần tử của stream.

Hãy xem ví dụ phản mẫu (anti-pattern) kinh điển dưới đây:

// ANTI-PATTERN: peek bi skip hoan toan
List<Integer> list = new ArrayList<>();
long count = Stream.of(1, 2, 3, 4)
    .peek(list::add)   // Khong bao gio chay!
    .count();

Lý do: Kể từ Java 9+, nếu terminal operation là một hàm như count() chạy trên một stream thuộc dạng SIZED (đã biết trước kích thước từ nguồn) và các bước trung gian (như peek hay map) không làm thay đổi kích thước của stream, JVM sẽ tối ưu bằng cách trả về trực tiếp kích thước của stream mà không cần duyệt qua bất kỳ phần tử nào. Kết quả là list của bạn sẽ rỗng!

Pattern đúng khi cần side-effect hoặc thay đổi trạng thái là thực hiện trong terminal operation như forEach hoặc thu gom phần tử trước rồi xử lý:

// Cach 1: thuc hien side-effect o terminal operation
stream.forEach(list::add);

// Cach 2: thu gom ket qua roi xu ly
list.addAll(stream.toList());

Hãy nhớ: peek chỉ là công cụ để quan sát tạm thời trong quá trình debug, không phải nơi thực thi business logic.

4. sorted, distinct, limit, skip

sorted

Stream.of(3, 1, 4, 1, 5, 9, 2, 6).sorted().toList();
// [1, 1, 2, 3, 4, 5, 6, 9]

// Voi comparator custom
Stream.of("banana", "apple", "cherry")
    .sorted(Comparator.comparingInt(String::length))
    .toList();
// ["apple", "banana", "cherry"]

sortedstateful — phải buffer toàn bộ element để sort. Hệ quả:

  • Không dùng với stream vô hạn (iterate, generate không limit).
  • Memory tăng với size (tệ với stream lớn).
  • Thường xuất hiện cuối pipeline (sort result cuối, không phải mỗi bước).

distinct

Stream.of(1, 2, 2, 3, 3, 3).distinct().toList();
// [1, 2, 3]

Dùng equals + hashCode để loại trùng. Giữ HashSet bên dưới → memory tăng với số distinct element.

Stream vô hạn với ít distinct value (vd random dice 1-6) → distinct hoạt động nhưng cần short-circuit (limit) để terminate.

limitskip

Stream.iterate(1, i -> i * 2)   // infinite: 1, 2, 4, 8, ...
    .skip(3)                    // bo 3 dau: bo 1, 2, 4
    .limit(3)                   // lay 3 element ke
    .toList();
// [8, 16, 32]

limit cũng short-circuit — dừng upstream khi đủ. Essential cho stream vô hạn.

skip không short-circuit — duyệt và bỏ n element đầu.

5. Parallel stream — có bài riêng

Stream còn một "công tắc" nữa: .parallel() / parallelStream() — chia data thành chunk chạy song song trên ForkJoinPool.commonPool(). Nghe hấp dẫn, nhưng đây là nơi nhiều bẫy nhất của Stream API: common pool shared toàn JVM (block I/O ở một chỗ làm chậm cả app), overhead fork/join, yêu cầu hàm associative, source phải splittable hiệu quả. "Thêm .parallel() là nhanh hơn" là ngộ nhận — nhiều trường hợp chậm hơn sequential.

Toàn bộ cơ chế ForkJoin, công thức Doug Lea N×Q > 10000, và 4 pitfall production được mổ xẻ trong bài Parallel Stream.

6. Pattern thực tế — log analysis

// Dem so line ERROR trong 1 trieu dong log
try (Stream<String> lines = Files.lines(Path.of("app.log"))) {
    long errorCount = lines
        .filter(l -> l.contains("ERROR"))
        .count();
    System.out.println("Errors: " + errorCount);
}

Sequential đủFiles.lines là I/O-bound và không chia chunk hiệu quả; vì sao parallel không giúp gì ở đây, xem bài Parallel Stream.

// Lay tat ca tag cua tat ca post (nested)
record Post(List<String> tags) { }

Set<String> allTags = posts.stream()
    .flatMap(p -> p.tags().stream())
    .collect(Collectors.toSet());

flatMap + toSet thay nested loop.

// Lay cac event truoc khi gap SHUTDOWN
List<Event> beforeShutdown = events.stream()
    .takeWhile(e -> !e.type().equals("SHUTDOWN"))
    .toList();

7. Pitfall tổng hợp

Nhầm 1: map khi cần flatMap.

List<List<String>> roles = users.stream()
    .map(User::roles)
    .toList();   // List long, khong flatten

flatMap(u -> u.roles().stream()).

Nhầm 2: takeWhile trên data không có prefix structure.

nums.stream().takeWhile(n -> n > 0)
// Nums = [1, -1, 2, 3] -> chi lay [1], bo [2, 3]

✅ Dùng filter cho lọc unconditional, takeWhile cho prefix có structure.

Nhầm 3: sorted() đứng trước limit trên stream vô hạn.

Stream.iterate(0, i -> i + 1).sorted().limit(10);   // Hang forever

sorted phải thấy hết element mới emit được — với nguồn vô hạn thì "hết" không bao giờ đến. Đảo thứ tự: .limit(10).sorted() chạy được, vì limit đã biến stream thành hữu hạn 10 phần tử trước khi sort.

Nhầm 4: peek cho logic side-effect.

stream.peek(this::save).count();   // save co the khong chay (Java 9+ optimize)

stream.forEach(this::save).

8. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Javadoc của mapMulti nêu đúng 2 tình huống nên thay flatMap: (1) mỗi phần tử cha sinh ra rất ít (0 hoặc 1) phần tử con — chi phí tạo stream con trở nên đáng kể so với công việc thật; (2) logic sinh phần tử con dạng imperative, khó biểu diễn thành stream. Ngoài 2 case này, flatMap vẫn là lựa chọn mặc định.

9. Tóm tắt

  • flatMap(Function<T, Stream<R>>) — flatten stream lồng, hoặc 1-to-N transform. Thay thế nested loop imperative.
  • flatMap tạo 1 stream con per phần tử cha — overhead allocation chỉ đáng quan tâm trong hot-path; khi đó cân nhắc mapMulti (Java 16+).
  • takeWhile/dropWhile (Java 9+) — stop/skip prefix theo predicate. Chỉ deterministic với ordered stream. Dùng cho data sorted hoặc có ranh giới logic.
  • peek — intermediate op cho debug/observe. Không cho logic side-effect — Java 9+ có thể skip peek khi optimizer bỏ qua duyệt.
  • sorted, distinct stateful — buffer element. Không dùng với stream vô hạn trừ khi limit đứng trước.
  • limit short-circuit (dừng upstream khi đủ); skip thì không.
  • Parallel stream: nhiều điều kiện phải hội đủ mới đáng dùng — toàn bộ ở bài 08.

10. Tự kiểm tra

Tự kiểm tra
Q1
Khi nào dùng map vs flatMap?
  • map: 1 input → 1 output. Stream<String>Stream<Integer>. Cardinality không đổi.
  • flatMap: 1 input → 0, 1, hay N output. Stream<User>Stream<Email> (mỗi user N email). Stream kết quả flatten 1 tầng.

Quy tắc: function bạn truyền sinh ra Stream<X>, List<X>, hay Optional<X> và bạn muốn "ra" là X không phải container → flatMap. Function sinh giá trị đơn lẻ → map.

Dấu hiệu rõ: nếu dùng map mà kết quả là Stream<Stream<X>> hay Stream<List<X>>, chắc chắn cần đổi sang flatMap.

Q2
Đoạn sau cho kết quả gì? Stream.of(1, 5, 2, 8, 3).takeWhile(n -> n < 5).toList()

takeWhile lấy prefix thoả predicate, dừng lần đầu predicate false.

Element 1: 1 < 5 true → giữ. Element 5: 5 < 5 false → dừng. Các element 2, 8, 3 bị bỏ dù 2 và 3 pass predicate.

[1]

Phân biệt với filter(n -> n < 5): cái đó duyệt hết, cho [1, 2, 3].

takeWhile khác filter ở chỗ: check từng element theo thứ tự, gặp false thì dừng — không xét element sau. Phù hợp với data có prefix structure (sorted, grouped).

Q3
Vì sao flatMap có chi phí allocation cao hơn map, và khi nào cân nhắc mapMulti?

map chỉ gọi function biến đổi và đẩy kết quả đi tiếp — không tạo object phụ trợ. flatMap nhận Function<T, Stream<R>>: với mỗi phần tử cha, một object Stream con (kèm spliterator) được khởi tạo trên heap chỉ để duyệt rồi vứt bỏ. Hàng trăm nghìn phần tử cha → hàng trăm nghìn stream con ngắn hạn → áp lực GC.

mapMulti (Java 16+) loại bỏ stream trung gian: bạn nhận một Consumer<R> và push trực tiếp từng phần tử kết quả vào nó.

Cân nhắc mapMulti khi: (1) profiler chỉ ra flatMap là điểm nóng; (2) mỗi phần tử cha sinh rất ít (0-1) phần tử con — lúc đó chi phí tạo stream con chiếm tỷ trọng lớn nhất. Với code thường, flatMap dễ đọc hơn và đủ nhanh — đừng tối ưu sớm.

Q4
Khi nào peek có thể bị JVM skip, và vì sao đừng dùng peek cho logic chính?

Java 9+ có optimization: nếu terminal op không cần duyệt từng element để tính kết quả (vd count() trên stream SIZED, nơi size biết từ source), JVM có thể bỏ qua duyệt — trả thẳng size. Intermediate op `peek`, `map` không thay đổi size cũng bị skip.

Ví dụ:

list.stream()
  .peek(item -> save(item))   // Co the KHONG chay
  .count();

Size của list JVM biết. Map/peek không đổi size. count() trả list.size() mà không duyệt. Hàm save không chạy.

Đây là bug silent — không có warning, test pass trên Java 8, fail trên Java 9+.

Rule: peek chỉ cho debug/log. Logic side-effect phải ở terminal op forEach:

list.stream().forEach(item -> save(item));
Q5
Vì sao sorted không dùng được với stream vô hạn?

sorted là stateful intermediate op — phải thấy toàn bộ element để sort đúng. Nó buffer element vào array tạm, sort, rồi emit theo thứ tự đã sort.

Stream vô hạn: "toàn bộ" không tồn tại. Buffer grows unbounded → OOM hoặc hang forever.

Thứ tự với limit quyết định sống chết:

  • Stream.iterate(0, i -> i + 1).limit(10).sorted() — OK. Limit TRƯỚC biến vô hạn thành hữu hạn 10 element, sorted OK.
  • Stream.iterate(0, i -> i + 1).sorted().limit(10) — HANG. Sorted cần thấy hết vô hạn → không bao giờ complete.

Tương tự distinct — buffer set element đã thấy. Stream với nhiều distinct value + vô hạn → memory grows unbounded.

Rule: stateful op (sorted, distinct) + stream vô hạn là bug. Đặt limit trước chúng.

Bài tiếp theo: Optional — xử lý null không vỡ

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên