Stream nâng cao — flatMap, takeWhile và peek
flatMap cho stream lồng (và lựa chọn mapMulti từ Java 16), takeWhile/dropWhile (Java 9+), peek debug, cùng các op stateful sorted/distinct/limit/skip — bộ công cụ cho 20% task còn lại sau map/filter/reduce.
TL;DR: Ngoài bộ ba filter/map/reduce, bốn nhóm công cụ chuyên dụng giải quyết 20% task còn lại: flatMap phẳng hoá stream lồng (1 phần tử cha sinh 0..N phần tử con — chú ý chi phí tạo stream con per-element, có mapMulti Java 16+ làm alternative); takeWhile/dropWhile (Java 9+) cắt prefix theo predicate — chỉ deterministic trên ordered stream; peek quan sát mid-pipeline cho debug, không dùng cho logic chính vì Java 9+ có thể skip nó; và nhóm stateful sorted/distinct phải buffer element — cấm dùng trên stream vô hạn nếu không có limit đứng trước. Parallel stream — bẫy lớn nhất của Stream API — có bài riêng.
filter, map, reduce giải quyết 80% task aggregate. 20% còn lại cần công cụ chuyên biệt:
List<User>mà mỗi user cóList<Order>— muốn lấy tất cả order flatten →flatMap.- Log file sorted theo thời gian, muốn dừng khi gặp dòng SHUTDOWN →
takeWhile. - Pipeline dài debug không biết element nào qua được stage nào →
peek.
Bài này đi sâu 3 nhóm công cụ trên cùng các op stateful (sorted, distinct, limit, skip). Còn parallel() — nơi nhiều bẫy nhất của Stream API — có bài riêng.
1. flatMap — phẳng hoá stream lồng
Vấn đề
Có List<User>, mỗi user có List<String> emails. Muốn lấy danh sách tất cả email, không lồng:
record User(String name, List<String> emails) { }
List<User> users = List.of(
new User("Alice", List.of("[email protected]", "[email protected]")),
new User("Bob", List.of("[email protected]"))
);
Dùng map:
List<List<String>> nested = users.stream()
.map(User::emails)
.toList();
// [[[email protected], [email protected]], [[email protected]]]
Sai rồi. Ta được List lồng — muốn flat list [[email protected], [email protected], [email protected]].
Giải pháp: flatMap
List<String> allEmails = users.stream()
.flatMap(u -> u.emails().stream())
.toList();
// [[email protected], [email protected], [email protected]]
flatMap(Function<T, Stream<R>>): function trả stream (không phải value), và flatMap flatten các stream đó thành 1 stream R.
So sánh map vs flatMap
| Op | Cardinality | Function signature |
|---|---|---|
map | 1 → 1 | T → R |
flatMap | 1 → 0, 1, hay N | T → Stream<R> |
map bảo toàn số element. flatMap có thể mở rộng hoặc thu nhỏ stream — 1 user có thể sinh 0, 1, hay 5 email đều được.
Ví dụ thực tế
Parse CSV nhiều dòng:
List<String> lines = Files.readAllLines(Path.of("data.csv"));
List<String> allCells = lines.stream()
.flatMap(line -> Arrays.stream(line.split(",")))
.map(String::trim)
.toList();
// ["name", "age", "Alice", "30", "Bob", "25"]
Mỗi line split ra nhiều cell → flatMap gộp tất cả cell từ mọi line vào 1 stream.
Flatten nested option:
Stream<Optional<String>> s = Stream.of(
Optional.of("a"),
Optional.empty(),
Optional.of("b")
);
List<String> present = s.flatMap(Optional::stream).toList();
// ["a", "b"]
Optional::stream trả Stream<T> với 0 element (empty) hoặc 1 element (present). flatMap flatten — tự động bỏ empty.
Nested loop imperative → flatMap:
// Imperative
List<Pair<User, Order>> pairs = new ArrayList<>();
for (User u : users) {
for (Order o : u.orders()) {
pairs.add(new Pair<>(u, o));
}
}
// Stream
List<Pair<User, Order>> pairs = users.stream()
.flatMap(u -> u.orders().stream().map(o -> new Pair<>(u, o)))
.toList();
flatMap thay thế nested loop rất tự nhiên.
Chi phí của flatMap — và lựa chọn mapMulti (Java 16+)
flatMap nhận Function<T, Stream<R>> — nghĩa là mỗi phần tử cha tạo một object Stream con mới trên heap (kèm spliterator đi cùng). Với collection lớn (hàng trăm nghìn phần tử cha), lượng allocation per-element này cộng dồn thành áp lực GC đo được trong hot-path.
Java 16 thêm Stream.mapMulti(BiConsumer<T, Consumer<R>>) cho đúng tình huống này: thay vì tạo stream con, bạn push từng phần tử kết quả vào consumer — không có stream trung gian nào được cấp phát.
// flatMap: tao 1 Stream con moi cho moi user
List<String> a = users.stream()
.flatMap(u -> u.emails().stream())
.toList();
// mapMulti (Java 16+): khong tao stream trung gian
List<String> b = users.stream()
.<String>mapMulti((u, consumer) -> u.emails().forEach(consumer))
.toList();
Quy tắc thực dụng: mặc định cứ dùng flatMap — dễ đọc, đúng ngữ nghĩa, và với đa số workload chi phí allocation không đáng kể. Chỉ khi profiler chỉ ra flatMap là điểm nóng (hot-path xử lý liên tục, mỗi phần tử cha chỉ sinh 0-1 phần tử con) mới cân nhắc mapMulti hoặc loop thường.
Nếu function biến đổi sinh ra Stream<X>, List<X>, hay Optional<X> → dùng flatMap để flatten 1 tầng. Nếu function sinh giá trị đơn thuần → map.
2. takeWhile và dropWhile (Java 9+)
filter vs takeWhile
filter(predicate): duyệt hết stream, giữ element true. Mỗi element check độc lập.
takeWhile(predicate): dừng lần đầu predicate false. Giữ prefix match.
List<Integer> nums = List.of(1, 2, 3, 4, 5, 1, 2);
nums.stream().filter(n -> n < 4).toList();
// [1, 2, 3, 1, 2] - lay moi element < 4
nums.stream().takeWhile(n -> n < 4).toList();
// [1, 2, 3] - dung lan dau n = 4, bo cac element sau
Chú ý: element 5, 1, 2 cuối list có phần tử pass n < 4 (là 1 và 2), nhưng takeWhile đã dừng từ element 4 → không quay lại xét.
dropWhile
Skip prefix match predicate, lấy phần còn lại:
nums.stream().dropWhile(n -> n < 4).toList();
// [4, 5, 1, 2]
Skip 1, 2, 3 (match). Element 4 không match → lấy nó và mọi element sau, không filter.
Khi nào dùng?
takeWhile/dropWhile phù hợp khi data có cấu trúc prefix — sorted hoặc có ranh giới logic.
Ví dụ:
- Log file sorted theo thời gian: lấy event trước thời điểm X.
events.stream() .takeWhile(e -> e.timestamp().isBefore(cutoff)) .toList(); - Parse header đến dòng rỗng: header block kết thúc bằng empty line.
lines.stream() .takeWhile(l -> !l.isEmpty()) .toList(); - Bỏ comment đầu file: skip các dòng bắt đầu bằng
#, lấy content thật.lines.stream() .dropWhile(l -> l.startsWith("#")) .toList();
takeWhile/dropWhile chỉ đảm bảo tính chính xác và nhất quán (deterministic) khi làm việc với ordered stream (stream đã được sắp xếp hoặc giữ nguyên thứ tự nguồn).
Nếu stream không được sắp xếp (unordered stream), runtime có thể dừng duyệt sớm một cách ngẫu nhiên vì thứ tự các phần tử không được định nghĩa rõ ràng. Điều này dẫn đến kết quả trả về không chính xác, mất dữ liệu không mong muốn và hành vi không nhất quán giữa các lần chạy. Hãy kiểm tra spliterator().hasCharacteristics(Spliterator.ORDERED) nếu không chắc chắn về cấu trúc nguồn dữ liệu của bạn trước khi dùng các thao tác ngắn mạch này.
3. peek — debug mid-pipeline
Intermediate op cho phép "nhìn vào" element mà không thay đổi — dùng debug pipeline dài:
List<Integer> result = Stream.of(1, 2, 3, 4)
.peek(n -> System.out.println("before filter: " + n))
.filter(n -> n > 2)
.peek(n -> System.out.println("after filter: " + n))
.map(n -> n * 10)
.peek(n -> System.out.println("after map: " + n))
.toList();
Output:
before filter: 1
before filter: 2
before filter: 3
after filter: 3
after map: 30
before filter: 4
after filter: 4
after map: 40
Thấy rõ:
- Element 1, 2 bị filter loại → không gọi map.
- Element đi element-by-element, không batch.
Đừng dùng peek cho logic chính
Chỉ dùng peek cho mục đích debug (ghi nhận thông tin/in log). Không dùng peek để thay đổi trạng thái đối tượng hoặc tương tác với cơ sở dữ liệu.
Lý do: từ Java 9+, runtime có thể tối ưu hóa và bỏ qua peek hoàn toàn nếu terminal operation không yêu cầu duyệt qua các phần tử của stream.
Hãy xem ví dụ phản mẫu (anti-pattern) kinh điển dưới đây:
// ANTI-PATTERN: peek bi skip hoan toan
List<Integer> list = new ArrayList<>();
long count = Stream.of(1, 2, 3, 4)
.peek(list::add) // Khong bao gio chay!
.count();
Lý do: Kể từ Java 9+, nếu terminal operation là một hàm như count() chạy trên một stream thuộc dạng SIZED (đã biết trước kích thước từ nguồn) và các bước trung gian (như peek hay map) không làm thay đổi kích thước của stream, JVM sẽ tối ưu bằng cách trả về trực tiếp kích thước của stream mà không cần duyệt qua bất kỳ phần tử nào. Kết quả là list của bạn sẽ rỗng!
Pattern đúng khi cần side-effect hoặc thay đổi trạng thái là thực hiện trong terminal operation như forEach hoặc thu gom phần tử trước rồi xử lý:
// Cach 1: thuc hien side-effect o terminal operation
stream.forEach(list::add);
// Cach 2: thu gom ket qua roi xu ly
list.addAll(stream.toList());
Hãy nhớ: peek chỉ là công cụ để quan sát tạm thời trong quá trình debug, không phải nơi thực thi business logic.
4. sorted, distinct, limit, skip
sorted
Stream.of(3, 1, 4, 1, 5, 9, 2, 6).sorted().toList();
// [1, 1, 2, 3, 4, 5, 6, 9]
// Voi comparator custom
Stream.of("banana", "apple", "cherry")
.sorted(Comparator.comparingInt(String::length))
.toList();
// ["apple", "banana", "cherry"]
sorted là stateful — phải buffer toàn bộ element để sort. Hệ quả:
- Không dùng với stream vô hạn (
iterate,generatekhông limit). - Memory tăng với size (tệ với stream lớn).
- Thường xuất hiện cuối pipeline (sort result cuối, không phải mỗi bước).
distinct
Stream.of(1, 2, 2, 3, 3, 3).distinct().toList();
// [1, 2, 3]
Dùng equals + hashCode để loại trùng. Giữ HashSet bên dưới → memory tăng với số distinct element.
Stream vô hạn với ít distinct value (vd random dice 1-6) → distinct hoạt động nhưng cần short-circuit (limit) để terminate.
limit và skip
Stream.iterate(1, i -> i * 2) // infinite: 1, 2, 4, 8, ...
.skip(3) // bo 3 dau: bo 1, 2, 4
.limit(3) // lay 3 element ke
.toList();
// [8, 16, 32]
limit cũng short-circuit — dừng upstream khi đủ. Essential cho stream vô hạn.
skip không short-circuit — duyệt và bỏ n element đầu.
5. Parallel stream — có bài riêng
Stream còn một "công tắc" nữa: .parallel() / parallelStream() — chia data thành chunk chạy song song trên ForkJoinPool.commonPool(). Nghe hấp dẫn, nhưng đây là nơi nhiều bẫy nhất của Stream API: common pool shared toàn JVM (block I/O ở một chỗ làm chậm cả app), overhead fork/join, yêu cầu hàm associative, source phải splittable hiệu quả. "Thêm .parallel() là nhanh hơn" là ngộ nhận — nhiều trường hợp chậm hơn sequential.
Toàn bộ cơ chế ForkJoin, công thức Doug Lea N×Q > 10000, và 4 pitfall production được mổ xẻ trong bài Parallel Stream.
6. Pattern thực tế — log analysis
// Dem so line ERROR trong 1 trieu dong log
try (Stream<String> lines = Files.lines(Path.of("app.log"))) {
long errorCount = lines
.filter(l -> l.contains("ERROR"))
.count();
System.out.println("Errors: " + errorCount);
}
Sequential đủ — Files.lines là I/O-bound và không chia chunk hiệu quả; vì sao parallel không giúp gì ở đây, xem bài Parallel Stream.
// Lay tat ca tag cua tat ca post (nested)
record Post(List<String> tags) { }
Set<String> allTags = posts.stream()
.flatMap(p -> p.tags().stream())
.collect(Collectors.toSet());
flatMap + toSet thay nested loop.
// Lay cac event truoc khi gap SHUTDOWN
List<Event> beforeShutdown = events.stream()
.takeWhile(e -> !e.type().equals("SHUTDOWN"))
.toList();
7. Pitfall tổng hợp
❌ Nhầm 1: map khi cần flatMap.
List<List<String>> roles = users.stream()
.map(User::roles)
.toList(); // List long, khong flatten
✅ flatMap(u -> u.roles().stream()).
❌ Nhầm 2: takeWhile trên data không có prefix structure.
nums.stream().takeWhile(n -> n > 0)
// Nums = [1, -1, 2, 3] -> chi lay [1], bo [2, 3]
✅ Dùng filter cho lọc unconditional, takeWhile cho prefix có structure.
❌ Nhầm 3: sorted() đứng trước limit trên stream vô hạn.
Stream.iterate(0, i -> i + 1).sorted().limit(10); // Hang forever
✅ sorted phải thấy hết element mới emit được — với nguồn vô hạn thì "hết" không bao giờ đến. Đảo thứ tự: .limit(10).sorted() chạy được, vì limit đã biến stream thành hữu hạn 10 phần tử trước khi sort.
❌ Nhầm 4: peek cho logic side-effect.
stream.peek(this::save).count(); // save co the khong chay (Java 9+ optimize)
✅ stream.forEach(this::save).
8. 📚 Deep Dive Oracle
Spec / reference chính thức:
- Stream.flatMap — semantic, note về ordering và close của inner stream.
- Stream.mapMulti — Java 16+, alternative cho flatMap khi muốn tránh tạo stream con per-element; javadoc nêu rõ 2 case nên dùng.
- Stream operations characteristics — ORDERED, SORTED, SIZED, DISTINCT; ảnh hưởng đến lazy optimization (lý do peek bị skip).
- JEP 266: More Concurrency Updates — Java 9 thêm
takeWhile/dropWhile.
Ghi chú: Javadoc của mapMulti nêu đúng 2 tình huống nên thay flatMap: (1) mỗi phần tử cha sinh ra rất ít (0 hoặc 1) phần tử con — chi phí tạo stream con trở nên đáng kể so với công việc thật; (2) logic sinh phần tử con dạng imperative, khó biểu diễn thành stream. Ngoài 2 case này, flatMap vẫn là lựa chọn mặc định.
9. Tóm tắt
flatMap(Function<T, Stream<R>>)— flatten stream lồng, hoặc 1-to-N transform. Thay thế nested loop imperative.flatMaptạo 1 stream con per phần tử cha — overhead allocation chỉ đáng quan tâm trong hot-path; khi đó cân nhắcmapMulti(Java 16+).takeWhile/dropWhile(Java 9+) — stop/skip prefix theo predicate. Chỉ deterministic với ordered stream. Dùng cho data sorted hoặc có ranh giới logic.peek— intermediate op cho debug/observe. Không cho logic side-effect — Java 9+ có thể skip peek khi optimizer bỏ qua duyệt.sorted,distinctstateful — buffer element. Không dùng với stream vô hạn trừ khilimitđứng trước.limitshort-circuit (dừng upstream khi đủ);skipthì không.- Parallel stream: nhiều điều kiện phải hội đủ mới đáng dùng — toàn bộ ở bài 08.
10. Tự kiểm tra
Q1Khi nào dùng map vs flatMap?▸
map vs flatMap?- map: 1 input → 1 output.
Stream<String>→Stream<Integer>. Cardinality không đổi. - flatMap: 1 input → 0, 1, hay N output.
Stream<User>→Stream<Email>(mỗi user N email). Stream kết quả flatten 1 tầng.
Quy tắc: function bạn truyền sinh ra Stream<X>, List<X>, hay Optional<X> và bạn muốn "ra" là X không phải container → flatMap. Function sinh giá trị đơn lẻ → map.
Dấu hiệu rõ: nếu dùng map mà kết quả là Stream<Stream<X>> hay Stream<List<X>>, chắc chắn cần đổi sang flatMap.
Q2Đoạn sau cho kết quả gì? Stream.of(1, 5, 2, 8, 3).takeWhile(n -> n < 5).toList()▸
Stream.of(1, 5, 2, 8, 3).takeWhile(n -> n < 5).toList()takeWhile lấy prefix thoả predicate, dừng lần đầu predicate false.
Element 1: 1 < 5 true → giữ. Element 5: 5 < 5 false → dừng. Các element 2, 8, 3 bị bỏ dù 2 và 3 pass predicate.
[1]Phân biệt với filter(n -> n < 5): cái đó duyệt hết, cho [1, 2, 3].
takeWhile khác filter ở chỗ: check từng element theo thứ tự, gặp false thì dừng — không xét element sau. Phù hợp với data có prefix structure (sorted, grouped).
Q3Vì sao flatMap có chi phí allocation cao hơn map, và khi nào cân nhắc mapMulti?▸
flatMap có chi phí allocation cao hơn map, và khi nào cân nhắc mapMulti?map chỉ gọi function biến đổi và đẩy kết quả đi tiếp — không tạo object phụ trợ. flatMap nhận Function<T, Stream<R>>: với mỗi phần tử cha, một object Stream con (kèm spliterator) được khởi tạo trên heap chỉ để duyệt rồi vứt bỏ. Hàng trăm nghìn phần tử cha → hàng trăm nghìn stream con ngắn hạn → áp lực GC.
mapMulti (Java 16+) loại bỏ stream trung gian: bạn nhận một Consumer<R> và push trực tiếp từng phần tử kết quả vào nó.
Cân nhắc mapMulti khi: (1) profiler chỉ ra flatMap là điểm nóng; (2) mỗi phần tử cha sinh rất ít (0-1) phần tử con — lúc đó chi phí tạo stream con chiếm tỷ trọng lớn nhất. Với code thường, flatMap dễ đọc hơn và đủ nhanh — đừng tối ưu sớm.
Q4Khi nào peek có thể bị JVM skip, và vì sao đừng dùng peek cho logic chính?▸
Java 9+ có optimization: nếu terminal op không cần duyệt từng element để tính kết quả (vd count() trên stream SIZED, nơi size biết từ source), JVM có thể bỏ qua duyệt — trả thẳng size. Intermediate op `peek`, `map` không thay đổi size cũng bị skip.
Ví dụ:
list.stream()
.peek(item -> save(item)) // Co the KHONG chay
.count();Size của list JVM biết. Map/peek không đổi size. count() trả list.size() mà không duyệt. Hàm save không chạy.
Đây là bug silent — không có warning, test pass trên Java 8, fail trên Java 9+.
Rule: peek chỉ cho debug/log. Logic side-effect phải ở terminal op forEach:
list.stream().forEach(item -> save(item));Q5Vì sao sorted không dùng được với stream vô hạn?▸
sorted không dùng được với stream vô hạn?sorted là stateful intermediate op — phải thấy toàn bộ element để sort đúng. Nó buffer element vào array tạm, sort, rồi emit theo thứ tự đã sort.
Stream vô hạn: "toàn bộ" không tồn tại. Buffer grows unbounded → OOM hoặc hang forever.
Thứ tự với limit quyết định sống chết:
Stream.iterate(0, i -> i + 1).limit(10).sorted()— OK. Limit TRƯỚC biến vô hạn thành hữu hạn 10 element, sorted OK.Stream.iterate(0, i -> i + 1).sorted().limit(10)— HANG. Sorted cần thấy hết vô hạn → không bao giờ complete.
Tương tự distinct — buffer set element đã thấy. Stream với nhiều distinct value + vô hạn → memory grows unbounded.
Rule: stateful op (sorted, distinct) + stream vô hạn là bug. Đặt limit trước chúng.
Bài tiếp theo: Optional — xử lý null không vỡ
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên