Stream basics — pipeline lazy và terminal operation
Stream không phải collection. Cơ chế lazy evaluation, intermediate vs terminal operation, element flow qua pipeline, short-circuit. Vì sao code không có terminal op không chạy gì cả.
Đoạn code sau in gì?
Stream.of(1, 2, 3)
.filter(x -> { System.out.println("check " + x); return x > 1; });
Câu trả lời: không in gì. Không phải lỗi, không phải bug — đây là đặc tính cốt lõi của Stream API.
Với dev đến từ JavaScript/Python/Ruby, phản xạ đầu tiên là: "filter chắc chắn phải chạy, nó nhận 3 element, in ra 3 dòng check". Trong Java Stream, filter không chạy gì cho đến khi có terminal operation — chủ đề chính của bài này.
Đây không phải quirk kỳ lạ. Lazy evaluation là thiết kế có chủ đích, cho phép 3 điều:
- Pipeline dài vẫn chạy trong 1 lần duyệt —
filter → map → filter → map → collectkhông tạo 4 collection trung gian. - Stream vô hạn khả thi —
Stream.iterate(0, i -> i + 1)không thực sự chạy cho đến khi terminal op yêu cầu. - Short-circuit —
findFirst,anyMatch,limitdừng ngay khi đủ, tiết kiệm công.
Bài này giải thích Stream là gì (và không phải gì), phân biệt intermediate vs terminal operation, element flow element-by-element qua pipeline, và khi nào short-circuit kích hoạt.
1. Analogy — Dây chuyền lắp ráp
Tưởng tượng nhà máy sản xuất ô tô. Băng chuyền có nhiều trạm: gắn bánh, sơn, kiểm tra, đóng gói. Công nhân đứng tại mỗi trạm.
Trạng thái ban đầu: băng chuyền đứng yên. Kể cả có linh kiện đang xếp đầu băng chuyền, không có gì xảy ra. Công nhân đứng chờ. Nguyên liệu ngồi đó.
Nhấn nút "bắt đầu" (công tắc): linh kiện đầu tiên chạy qua tất cả trạm — gắn bánh, sơn, kiểm tra, đóng gói — thành 1 xe hoàn chỉnh. Rồi linh kiện thứ 2 chạy. Rồi linh kiện thứ 3.
Không phải: "gắn bánh cho cả 1000 linh kiện, rồi sơn cả 1000, rồi kiểm tra cả 1000". Đó là cách build theo batch — tốn không gian (phải chứa 1000 xe sơn xong chờ kiểm tra), tốn thời gian (muốn xe đầu tiên phải đợi cả batch).
Stream hoạt động giống băng chuyền. Intermediate operation là các trạm — filter, map, sorted. Terminal operation (collect, count, forEach) là công tắc — không nhấn, băng chuyền đứng yên.
| Đời thường | Stream |
|---|---|
| Linh kiện nguyên liệu | Source (collection, array, generator) |
| Sản phẩm trên băng chuyền | Element (T) trong pipeline |
| Trạm lắp ráp | Intermediate operation (filter, map) |
| Công tắc bật băng chuyền | Terminal operation (collect, count) |
| Sản phẩm đi từng cái qua toàn chuyền | Element-by-element pipeline |
Intermediate op không làm gì — chỉ ghi lại ý định. Terminal op bật công tắc — khi đó element mới thực sự chảy qua pipeline.
2. Stream là gì — và không phải gì
Đây là điểm hay nhầm của người đến từ ngôn ngữ khác. Trong Python, Ruby, JavaScript, bạn gọi list.filter(...).map(...) — mỗi bước tạo list mới, giữ trong memory, xử lý eager.
Java Stream không phải cấu trúc lưu trữ. Nó là abstraction cho chuỗi xử lý. Khác biệt rõ ràng với List:
| List | Stream |
|---|---|
| Lưu trữ element trong memory | Không lưu — chỉ mô tả operation |
Có size() xác định | Có thể vô hạn |
| Duyệt nhiều lần | Chỉ duyệt 1 lần — consume rồi là hết |
| Thay đổi được (mutable) | Immutable pipeline |
Random access (get(i)) | Sequential, không index |
Code minh hoạ:
List<Integer> list = List.of(1, 2, 3, 4, 5);
// List duyet nhieu lan duoc
list.forEach(System.out::println);
list.forEach(System.out::println); // Chay dung lan thu 2
// Stream chi duyet 1 lan
Stream<Integer> s = list.stream();
s.forEach(System.out::println); // OK
s.forEach(System.out::println); // IllegalStateException: stream has already been operated upon or closed
Vì sao chỉ 1 lần? Vì stream không lưu element — nó giữ con trỏ tới source (list, file, generator). Khi terminal op chạy, nó consume source qua spliterator. Consume xong, stream mark closed.
Lý do thiết kế: stream có thể từ nguồn không rewind được — file I/O, network socket, generator. Nếu cho phép duyệt lại, mỗi source cần implement "reset" — vừa phức tạp vừa không khả thi cho một số source. Java chọn nhất quán: mọi stream one-shot.
Muốn duyệt lại? Tạo stream mới từ source:
list.stream().forEach(System.out::println);
list.stream().forEach(System.out::println); // OK - stream moi
Tạo stream rẻ (chỉ là con trỏ), không copy data.
3. Tạo stream — nhiều nguồn
Stream có thể đến từ đâu?
// Tu collection - pho bien nhat
Stream<String> fromList = List.of("a", "b", "c").stream();
// Tu mang
Stream<Integer> fromArray = Arrays.stream(new Integer[]{1, 2, 3});
IntStream fromIntArray = Arrays.stream(new int[]{1, 2, 3}); // Primitive stream
// Tu factory method
Stream<String> fromOf = Stream.of("a", "b", "c");
Stream<String> empty = Stream.empty();
// Tu generator (co the vo han)
Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1); // 0, 1, 2, 3, ...
Stream<Double> randoms = Stream.generate(Math::random); // random infinite
IntStream range = IntStream.range(0, 10); // 0..9 (exclusive)
IntStream rangeClosed = IntStream.rangeClosed(1, 10); // 1..10 (inclusive)
// Tu file (phai close)
try (Stream<String> lines = Files.lines(Path.of("log.txt"))) {
lines.forEach(System.out::println);
}
Quan trọng: stream vô hạn (iterate, generate, Files.lines với file streaming) bắt buộc phải kết hợp với limit hoặc short-circuit terminal op — nếu không sẽ chạy mãi.
Stream.iterate(0, i -> i + 1)
.limit(5) // BAT BUOC - khong co limit se vo han
.forEach(System.out::println);
Stream.iterate(seed, UnaryOperator) còn có dạng 3-arg (Java 9+) với predicate dừng — giống for:
// Java 9+: iterate co predicate
Stream.iterate(0, i -> i < 100, i -> i + 1)
.forEach(System.out::println);
// Tuong duong: for (int i = 0; i < 100; i++)
4. Intermediate vs terminal operation
Stream API có 2 nhóm operation, phân biệt bởi return type:
Intermediate operation trả về Stream<T> (hoặc IntStream, LongStream, ...) — vẫn là stream, chain tiếp được. Lazy — không chạy logic, chỉ ghi lại ý định vào pipeline.
Terminal operation trả về kết quả khác stream (List, long, boolean, void, ...) — đóng stream, kích hoạt pipeline.
List<Integer> nums = List.of(1, 2, 3, 4, 5);
long count = nums.stream()
.filter(n -> n % 2 == 0) // intermediate -> Stream<Integer>
.map(n -> n * n) // intermediate -> Stream<Integer>
.count(); // terminal -> long
// count = 2 (so 2 va 4 qua filter)
Bảng đầy đủ các op thường dùng:
| Loại | Op | Semantic |
|---|---|---|
| Intermediate | filter(Predicate) | Lọc theo điều kiện |
| Intermediate | map(Function) | Biến đổi element |
| Intermediate | flatMap(Function) | Flatten stream lồng |
| Intermediate | sorted() / sorted(Comparator) | Sắp xếp (stateful — buffer hết) |
| Intermediate | distinct() | Loại trùng (stateful — giữ set) |
| Intermediate | limit(n) | Giữ n đầu |
| Intermediate | skip(n) | Bỏ n đầu |
| Intermediate | peek(Consumer) | Quan sát, không đổi |
| Intermediate | takeWhile(Predicate) | Giữ prefix khi predicate true |
| Intermediate | dropWhile(Predicate) | Bỏ prefix khi predicate true |
| Terminal | collect(Collector) | Gom thành collection/map |
| Terminal | forEach(Consumer) | Chạy action trên mỗi element |
| Terminal | count() | Đếm |
| Terminal | reduce(identity, fn) | Gộp về 1 giá trị |
| Terminal | findFirst() / findAny() | Lấy 1 element (Optional) |
| Terminal | anyMatch / allMatch / noneMatch | Kiểm tra điều kiện (boolean) |
| Terminal | min(Comparator) / max(Comparator) | Lấy min/max |
| Terminal | toArray() / toList() | Ra array/list |
Cách phân biệt nhanh: trả Stream<X> → intermediate. Trả khác → terminal.
5. Lazy evaluation — chi tiết
Quay lại code mở đầu bài:
Stream.of(1, 2, 3)
.filter(x -> { System.out.println("check " + x); return x > 1; });
// Khong in gi
Không có terminal op → pipeline không chạy. Thêm toList():
var result = Stream.of(1, 2, 3)
.filter(x -> { System.out.println("check " + x); return x > 1; })
.toList();
Output:
check 1
check 2
check 3
result = [2, 3]. Filter chạy 3 lần — 1 cho mỗi element.
Cơ chế bên dưới
Khi bạn gọi stream.filter(predicate), Java không chạy predicate ngay. Nó tạo 1 object wrapping stream gốc + predicate — gọi là "pipeline stage". Mỗi intermediate op thêm 1 stage.
Khi terminal op gọi, stage cuối yêu cầu stage trước cung cấp element — stage trước yêu cầu stage trước nữa — lan tới source. Source push element, element chảy qua stage ngược lên terminal.
Model này gọi là "pull-based pipeline" — giống lazy iterator. Nếu chỉ có intermediate op mà không có terminal, chuỗi stage tồn tại nhưng không ai pull → không chạy.
Ưu thế 1: không intermediate collection
So sánh với viết thủ công bằng loop:
// Cach tu nhien (imperative)
List<Integer> filtered = new ArrayList<>();
for (int n : nums) {
if (n % 2 == 0) filtered.add(n);
}
List<Integer> squared = new ArrayList<>();
for (int n : filtered) {
squared.add(n * n);
}
// 2 list trung gian tao ra
Stream:
List<Integer> result = nums.stream()
.filter(n -> n % 2 == 0)
.map(n -> n * n)
.toList();
// Khong co list trung gian
Với pipeline 10 stage trên 1 triệu element, imperative tạo 10 list trung gian — tốn 10 triệu object heap + GC pressure. Stream tạo đúng 1 list kết quả.
Ưu thế 2: stream vô hạn khả thi
// Tim 5 so chan dau tien
List<Integer> first5Even = Stream.iterate(0, i -> i + 1)
.filter(i -> i % 2 == 0)
.limit(5)
.toList();
// [0, 2, 4, 6, 8]
Nếu eager, filter(Predicate) phải tạo list tất cả số chẵn trong infinite stream — chết máy. Lazy: filter chỉ chạy khi có yêu cầu element, limit dừng sau 5 element.
6. Element-by-element, không batch
Điểm phản trực giác tiếp theo: pipeline không chạy "filter cho tất cả, rồi map cho tất cả". Mỗi element đi từ đầu đến cuối pipeline trước khi element kế tiếp bắt đầu.
Stream.of(1, 2, 3)
.filter(x -> { System.out.println("filter " + x); return x > 1; })
.map(x -> { System.out.println("map " + x); return x * 10; })
.toList();
Output:
filter 1
filter 2
map 2
filter 3
map 3
Phân tích:
- Element 1 vào
filter: log "filter 1", predicate false → bị loại. Không gọimapcho 1. - Element 2 vào
filter: log "filter 2", predicate true → chuyển sangmap: log "map 2". - Element 3 tương tự 2: log "filter 3", log "map 3".
flowchart LR
A[1] -- filter: false --> X[bi loai]
B[2] -- filter: true --> M2[map: 2 * 10 = 20]
C[3] -- filter: true --> M3[map: 3 * 10 = 30]Đây là tính chất quan trọng cho hiệu năng:
- Không phí công: element bị
filterloại không tốnmap. - Short-circuit khả thi: terminal op có thể dừng giữa chừng (xem mục 7).
- Stream vô hạn chạy được: element-by-element → không buffer hết.
Ngoại lệ: stateful intermediate
Một số intermediate op phải buffer element trước khi emit:
sorted()— phải thấy toàn bộ element để sort. Nếu stream vô hạn + sorted → hang forever.distinct()— phải giữ set element đã thấy để loại trùng. Memory tăng với số distinct.
Stream.of(3, 1, 4, 1, 5, 9, 2, 6)
.sorted() // Tai day phai buffer het
.limit(3) // Roi moi limit
.forEach(System.out::println);
// 1, 1, 2
limit(3) trước hay sau sorted()? Khác nhau hoàn toàn. Thứ tự này: sort hết, lấy 3 đầu. Đảo limit(3).sorted(): lấy 3 element đầu từ source (3, 1, 4), sort → (1, 3, 4). Tư duy theo pipeline thứ tự.
7. Short-circuit terminal operation
Terminal op short-circuit dừng ngay khi đủ kết quả, không duyệt hết stream:
findFirst(),findAny()— dừng khi tìm được 1 element.anyMatch(predicate)— dừng khi thấy element true đầu tiên.allMatch(predicate)— dừng khi thấy element false đầu tiên.noneMatch(predicate)— tương tự.limit(n)— dừng upstream khi đã emit n element.
Ví dụ dùng short-circuit trên stream vô hạn:
Optional<Integer> firstBig = Stream.iterate(0, i -> i + 1)
.filter(i -> i > 100)
.findFirst();
// Optional[101] - dung ngay khi tim duoc 101
Không có short-circuit → stream vô hạn chạy forever. Short-circuit là điều làm stream API "safe" với infinite source.
Tương tự:
boolean hasNegative = nums.stream().anyMatch(n -> n < 0);
// Stop ngay khi thay so am dau tien
Nếu nums có 1 triệu element và element thứ 3 đã âm, chỉ duyệt 3 element. Imperative loop tương đương:
boolean hasNegative = false;
for (int n : nums) {
if (n < 0) { hasNegative = true; break; }
}
Stream version ngắn hơn, intent rõ hơn.
8. Stream bất biến — không dùng lại được
Đây là bẫy khác của người mới:
Stream<Integer> s = list.stream();
long count = s.count(); // OK, terminal op
s.forEach(System.out::println); // IllegalStateException!
// stream has already been operated upon or closed
Hoặc:
Stream<Integer> s = list.stream().filter(n -> n > 0);
List<Integer> a = s.toList();
List<Integer> b = s.toList(); // IllegalStateException
Mỗi stream dùng 1 lần. Có terminal op → consume → closed.
Muốn tái dùng logic, tạo function trả stream:
Supplier<Stream<Integer>> filtered = () -> list.stream().filter(n -> n > 0);
List<Integer> a = filtered.get().toList();
List<Integer> b = filtered.get().toList(); // OK - stream moi
Hoặc collect thành List rồi dùng lại list đó.
9. Pitfall tổng hợp
❌ Nhầm 1: Reuse stream.
Stream<Integer> s = list.stream();
s.count();
s.forEach(System.out::println); // IllegalStateException
✅ Tạo stream mới mỗi lần hoặc collect thành List:
list.stream().count();
list.stream().forEach(System.out::println);
❌ Nhầm 2: Quên terminal op.
list.stream().filter(x -> x > 10); // Khong chay gi
✅ Thêm terminal: .toList(), .forEach(...), .count().
❌ Nhầm 3: Dùng peek cho logic chính.
list.stream().peek(x -> save(x)).count(); // peek co the bi skip o Java 9+
✅ peek là intermediate, chỉ dùng debug/log. Dùng forEach cho side-effect chính:
list.stream().forEach(this::save);
❌ Nhầm 4: Modify collection gốc trong lambda stream.
list.stream().forEach(x -> list.add(x * 2)); // ConcurrentModificationException
✅ Collect thành list mới:
List<Integer> doubled = list.stream().map(x -> x * 2).toList();
❌ Nhầm 5: sorted() trên stream vô hạn.
Stream.iterate(0, i -> i + 1).sorted().limit(5); // Hang forever
✅ sorted() cần thấy hết element. Dùng limit TRƯỚC sorted nếu phù hợp, hoặc đổi source.
10. 📚 Deep Dive Oracle
Spec / reference chính thức:
- java.util.stream package-summary — overview Stream API, intermediate/terminal taxonomy, operation characteristics.
- Stream interface — tất cả method có sẵn với ví dụ.
- Spliterator interface — cơ chế bên dưới stream, cho phép parallel.
- JEP 107: Bulk Data Operations for Collections — thiết kế Stream API cho Java 8, motivation.
- JEP 269: Convenience Factory Methods for Collections — Java 9 thêm
List.of,Set.of,Map.of.
Ghi chú: Package-summary giải thích khái niệm operation characteristics — mỗi op có tag SIZED, ORDERED, DISTINCT, SORTED. JVM dùng các tag này để optimize. Ví dụ count() trên stream SIZED + không có filter → JVM trả thẳng size() không duyệt. Đó là lý do một số bài benchmark thấy count() nhanh bất thường.
11. Tóm tắt
- Stream là pipeline mô tả xử lý, không phải cấu trúc lưu trữ. Không giữ element, không
size()(với stream vô hạn), chỉ duyệt 1 lần. - Intermediate op trả
Stream<T>, lazy — chỉ ghi lại ý định vào pipeline. - Terminal op trả kết quả khác stream, kích hoạt pipeline chạy từ source lên.
- Lazy evaluation cho phép: pipeline dài không tạo collection trung gian, stream vô hạn khả thi, short-circuit tiết kiệm công.
- Element đi từng cái qua toàn pipeline (
filter → map → ...) trước khi element kế bắt đầu. Không batch. - Stateful intermediate (
sorted,distinct) phải buffer — không dùng với stream vô hạn. - Short-circuit terminal (
findFirst,anyMatch,limit) dừng khi đủ, làm stream vô hạn an toàn. - Stream one-shot — consume xong là closed. Cần reuse → tạo stream mới từ source hoặc collect thành list.
- Tạo stream từ: collection, array, factory, generator (
iterate,generate), file. Primitive stream (IntStream, ...) tránh boxing.
12. Tự kiểm tra
Q1Đoạn sau in gì? Stream.of(1, 2, 3).filter(n -> n > 1);▸
Stream.of(1, 2, 3).filter(n -> n > 1);Không in gì, không làm gì.
Không có terminal operation → pipeline lazy không chạy. Filter chỉ được "ghi lại ý định" vào pipeline stage, nhưng không ai yêu cầu element → predicate không chạy.
Thêm terminal op (.toList(), .forEach(...), .count()) mới kích hoạt. Đây là lỗi thường gặp nhất của người mới Stream — nghĩ filter luôn chạy như Python/JavaScript.
Q2Vì sao Stream chỉ duyệt được 1 lần, khác với List?▸
Stream không lưu element — chỉ giữ con trỏ tới source (collection, file, network, generator). Khi terminal op chạy, nó consume source qua Spliterator. Consume xong, stream mark closed.
Lý do thiết kế: stream có thể từ nguồn không rewind được — file I/O đọc forward, socket read một lần, generator sinh lazy. Không thể design API "duyệt lại" cho tất cả source. Java chọn đơn giản và nhất quán: mọi stream one-shot.
Muốn duyệt lại → tạo stream mới từ source (list.stream() rẻ) hoặc collect thành List rồi dùng.
Q3Pipeline list.stream().filter(...).map(...).toList() có duyệt list 2 lần không?▸
list.stream().filter(...).map(...).toList() có duyệt list 2 lần không?Không — chỉ 1 lần. Mỗi element đi qua filter → (nếu qua) map → được add vào result list. Rồi element kế tiếp bắt đầu.
Đây là lợi thế lớn so với viết imperative thủ công: imperative 2 pass phải tạo list trung gian sau filter, rồi duyệt lại list đó để map. Stream không tạo trung gian.
Với pipeline 5 stage trên 1 triệu element: imperative tạo 5 list (5 triệu object), stream tạo 1 list kết quả. Khác biệt memory rất lớn, đặc biệt với GC pressure.
Q4Đoạn sau in gì theo thứ tự nào? Stream.of(1, 2, 3).filter(n -> { System.out.println("f" + n); return n > 1; }).map(n -> { System.out.println("m" + n); return n * 10; }).toList();▸
Stream.of(1, 2, 3).filter(n -> { System.out.println("f" + n); return n > 1; }).map(n -> { System.out.println("m" + n); return n * 10; }).toList();Element đi từng cái qua toàn pipeline:
f1 // element 1 vao filter
f2 // element 2 vao filter
m2 // 2 qua filter, chay map
f3 // element 3 vao filter
m3 // 3 qua filter, chay mapElement 1 bị filter loại → không gọi map cho 1. Nếu pipeline chạy batch ("filter hết trước, map hết sau") sẽ là f1 f2 f3 m2 m3 — khác hẳn.
Trật tự element-by-element là lý do stream vô hạn và short-circuit hoạt động được.
Q5Khi nào nên dùng Stream.iterate vs IntStream.range?▸
Stream.iterate vs IntStream.range?- IntStream.range(0, n): khi cần int sequential từ a đến b. Nhanh nhất, không boxing, không overhead
UnaryOperator. - Stream.iterate(seed, fn): khi cần sinh theo công thức, vd
iterate(1, n -> n * 2)→ 1, 2, 4, 8, 16... Cầnlimit()để dừng (hoặc dùng dạng 3-arg Java 9+ có predicate). - Stream.iterate 3-arg (Java 9+):
iterate(0, i -> i < 100, i -> i + 1)— giốngforloop, không cầnlimit.
Rule nhanh: int range đơn giản → IntStream.range. Công thức phức tạp → Stream.iterate. Stream random/fake data → Stream.generate.
Q6Khác biệt giữa peek và forEach là gì? Khi nào dùng peek?▸
- peek(Consumer): intermediate, trả
Stream<T>. Không kích hoạt pipeline. Dùng để quan sát element mid-pipeline. - forEach(Consumer): terminal, trả
void. Kích hoạt pipeline. Side-effect trên mỗi element.
Dùng peek chỉ để debug/logging:
stream
.peek(x -> System.out.println("before filter: " + x))
.filter(x -> x > 0)
.peek(x -> System.out.println("after filter: " + x))
.toList();Không dùng peek cho logic chính. Java 9+ có thể skip peek khi optimizer nhận thấy terminal op không cần duyệt (vd count() trên stream SIZED). Logic side-effect phải ở terminal forEach.
Bài tiếp theo: map / filter / reduce — ba operation cốt lõi
Bài này có giúp bạn hiểu bản chất không?