Java — Từ Zero đến Senior/Stream basics — pipeline lazy và terminal operation
~20 phútStream API & LambdaMiễn phí

Stream basics — pipeline lazy và terminal operation

Stream không phải collection. Cơ chế lazy evaluation, intermediate vs terminal operation, element flow qua pipeline, short-circuit. Vì sao code không có terminal op không chạy gì cả.

Đoạn code sau in gì?

Stream.of(1, 2, 3)
    .filter(x -> { System.out.println("check " + x); return x > 1; });

Câu trả lời: không in gì. Không phải lỗi, không phải bug — đây là đặc tính cốt lõi của Stream API.

Với dev đến từ JavaScript/Python/Ruby, phản xạ đầu tiên là: "filter chắc chắn phải chạy, nó nhận 3 element, in ra 3 dòng check". Trong Java Stream, filter không chạy gì cho đến khi có terminal operation — chủ đề chính của bài này.

Đây không phải quirk kỳ lạ. Lazy evaluation là thiết kế có chủ đích, cho phép 3 điều:

  1. Pipeline dài vẫn chạy trong 1 lần duyệtfilter → map → filter → map → collect không tạo 4 collection trung gian.
  2. Stream vô hạn khả thiStream.iterate(0, i -> i + 1) không thực sự chạy cho đến khi terminal op yêu cầu.
  3. Short-circuitfindFirst, anyMatch, limit dừng ngay khi đủ, tiết kiệm công.

Bài này giải thích Stream là gì (và không phải gì), phân biệt intermediate vs terminal operation, element flow element-by-element qua pipeline, và khi nào short-circuit kích hoạt.

1. Analogy — Dây chuyền lắp ráp

Tưởng tượng nhà máy sản xuất ô tô. Băng chuyền có nhiều trạm: gắn bánh, sơn, kiểm tra, đóng gói. Công nhân đứng tại mỗi trạm.

Trạng thái ban đầu: băng chuyền đứng yên. Kể cả có linh kiện đang xếp đầu băng chuyền, không có gì xảy ra. Công nhân đứng chờ. Nguyên liệu ngồi đó.

Nhấn nút "bắt đầu" (công tắc): linh kiện đầu tiên chạy qua tất cả trạm — gắn bánh, sơn, kiểm tra, đóng gói — thành 1 xe hoàn chỉnh. Rồi linh kiện thứ 2 chạy. Rồi linh kiện thứ 3.

Không phải: "gắn bánh cho cả 1000 linh kiện, rồi sơn cả 1000, rồi kiểm tra cả 1000". Đó là cách build theo batch — tốn không gian (phải chứa 1000 xe sơn xong chờ kiểm tra), tốn thời gian (muốn xe đầu tiên phải đợi cả batch).

Stream hoạt động giống băng chuyền. Intermediate operation là các trạm — filter, map, sorted. Terminal operation (collect, count, forEach) là công tắc — không nhấn, băng chuyền đứng yên.

Đời thườngStream
Linh kiện nguyên liệuSource (collection, array, generator)
Sản phẩm trên băng chuyềnElement (T) trong pipeline
Trạm lắp rápIntermediate operation (filter, map)
Công tắc bật băng chuyềnTerminal operation (collect, count)
Sản phẩm đi từng cái qua toàn chuyềnElement-by-element pipeline
💡 Cách nhớ

Intermediate op không làm gì — chỉ ghi lại ý định. Terminal op bật công tắc — khi đó element mới thực sự chảy qua pipeline.

2. Stream là gì — và không phải gì

Đây là điểm hay nhầm của người đến từ ngôn ngữ khác. Trong Python, Ruby, JavaScript, bạn gọi list.filter(...).map(...) — mỗi bước tạo list mới, giữ trong memory, xử lý eager.

Java Stream không phải cấu trúc lưu trữ. Nó là abstraction cho chuỗi xử lý. Khác biệt rõ ràng với List:

ListStream
Lưu trữ element trong memoryKhông lưu — chỉ mô tả operation
size() xác địnhCó thể vô hạn
Duyệt nhiều lầnChỉ duyệt 1 lần — consume rồi là hết
Thay đổi được (mutable)Immutable pipeline
Random access (get(i))Sequential, không index

Code minh hoạ:

List<Integer> list = List.of(1, 2, 3, 4, 5);

// List duyet nhieu lan duoc
list.forEach(System.out::println);
list.forEach(System.out::println);   // Chay dung lan thu 2

// Stream chi duyet 1 lan
Stream<Integer> s = list.stream();
s.forEach(System.out::println);   // OK
s.forEach(System.out::println);   // IllegalStateException: stream has already been operated upon or closed

Vì sao chỉ 1 lần? Vì stream không lưu element — nó giữ con trỏ tới source (list, file, generator). Khi terminal op chạy, nó consume source qua spliterator. Consume xong, stream mark closed.

Lý do thiết kế: stream có thể từ nguồn không rewind được — file I/O, network socket, generator. Nếu cho phép duyệt lại, mỗi source cần implement "reset" — vừa phức tạp vừa không khả thi cho một số source. Java chọn nhất quán: mọi stream one-shot.

Muốn duyệt lại? Tạo stream mới từ source:

list.stream().forEach(System.out::println);
list.stream().forEach(System.out::println);   // OK - stream moi

Tạo stream rẻ (chỉ là con trỏ), không copy data.

3. Tạo stream — nhiều nguồn

Stream có thể đến từ đâu?

// Tu collection - pho bien nhat
Stream<String> fromList = List.of("a", "b", "c").stream();

// Tu mang
Stream<Integer> fromArray = Arrays.stream(new Integer[]{1, 2, 3});
IntStream fromIntArray = Arrays.stream(new int[]{1, 2, 3});   // Primitive stream

// Tu factory method
Stream<String> fromOf = Stream.of("a", "b", "c");
Stream<String> empty = Stream.empty();

// Tu generator (co the vo han)
Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);         // 0, 1, 2, 3, ...
Stream<Double> randoms = Stream.generate(Math::random);                // random infinite
IntStream range = IntStream.range(0, 10);                              // 0..9 (exclusive)
IntStream rangeClosed = IntStream.rangeClosed(1, 10);                  // 1..10 (inclusive)

// Tu file (phai close)
try (Stream<String> lines = Files.lines(Path.of("log.txt"))) {
    lines.forEach(System.out::println);
}

Quan trọng: stream vô hạn (iterate, generate, Files.lines với file streaming) bắt buộc phải kết hợp với limit hoặc short-circuit terminal op — nếu không sẽ chạy mãi.

Stream.iterate(0, i -> i + 1)
    .limit(5)                 // BAT BUOC - khong co limit se vo han
    .forEach(System.out::println);

Stream.iterate(seed, UnaryOperator) còn có dạng 3-arg (Java 9+) với predicate dừng — giống for:

// Java 9+: iterate co predicate
Stream.iterate(0, i -> i < 100, i -> i + 1)
    .forEach(System.out::println);
// Tuong duong: for (int i = 0; i < 100; i++)

4. Intermediate vs terminal operation

Stream API có 2 nhóm operation, phân biệt bởi return type:

Intermediate operation trả về Stream<T> (hoặc IntStream, LongStream, ...) — vẫn là stream, chain tiếp được. Lazy — không chạy logic, chỉ ghi lại ý định vào pipeline.

Terminal operation trả về kết quả khác stream (List, long, boolean, void, ...) — đóng stream, kích hoạt pipeline.

List<Integer> nums = List.of(1, 2, 3, 4, 5);

long count = nums.stream()
    .filter(n -> n % 2 == 0)   // intermediate -> Stream<Integer>
    .map(n -> n * n)           // intermediate -> Stream<Integer>
    .count();                   // terminal -> long
// count = 2 (so 2 va 4 qua filter)

Bảng đầy đủ các op thường dùng:

LoạiOpSemantic
Intermediatefilter(Predicate)Lọc theo điều kiện
Intermediatemap(Function)Biến đổi element
IntermediateflatMap(Function)Flatten stream lồng
Intermediatesorted() / sorted(Comparator)Sắp xếp (stateful — buffer hết)
Intermediatedistinct()Loại trùng (stateful — giữ set)
Intermediatelimit(n)Giữ n đầu
Intermediateskip(n)Bỏ n đầu
Intermediatepeek(Consumer)Quan sát, không đổi
IntermediatetakeWhile(Predicate)Giữ prefix khi predicate true
IntermediatedropWhile(Predicate)Bỏ prefix khi predicate true
Terminalcollect(Collector)Gom thành collection/map
TerminalforEach(Consumer)Chạy action trên mỗi element
Terminalcount()Đếm
Terminalreduce(identity, fn)Gộp về 1 giá trị
TerminalfindFirst() / findAny()Lấy 1 element (Optional)
TerminalanyMatch / allMatch / noneMatchKiểm tra điều kiện (boolean)
Terminalmin(Comparator) / max(Comparator)Lấy min/max
TerminaltoArray() / toList()Ra array/list

Cách phân biệt nhanh: trả Stream<X> → intermediate. Trả khác → terminal.

5. Lazy evaluation — chi tiết

Quay lại code mở đầu bài:

Stream.of(1, 2, 3)
    .filter(x -> { System.out.println("check " + x); return x > 1; });
// Khong in gi

Không có terminal op → pipeline không chạy. Thêm toList():

var result = Stream.of(1, 2, 3)
    .filter(x -> { System.out.println("check " + x); return x > 1; })
    .toList();

Output:

check 1
check 2
check 3

result = [2, 3]. Filter chạy 3 lần — 1 cho mỗi element.

Cơ chế bên dưới

Khi bạn gọi stream.filter(predicate), Java không chạy predicate ngay. Nó tạo 1 object wrapping stream gốc + predicate — gọi là "pipeline stage". Mỗi intermediate op thêm 1 stage.

Khi terminal op gọi, stage cuối yêu cầu stage trước cung cấp element — stage trước yêu cầu stage trước nữa — lan tới source. Source push element, element chảy qua stage ngược lên terminal.

Model này gọi là "pull-based pipeline" — giống lazy iterator. Nếu chỉ có intermediate op mà không có terminal, chuỗi stage tồn tại nhưng không ai pull → không chạy.

Ưu thế 1: không intermediate collection

So sánh với viết thủ công bằng loop:

// Cach tu nhien (imperative)
List<Integer> filtered = new ArrayList<>();
for (int n : nums) {
    if (n % 2 == 0) filtered.add(n);
}
List<Integer> squared = new ArrayList<>();
for (int n : filtered) {
    squared.add(n * n);
}
// 2 list trung gian tao ra

Stream:

List<Integer> result = nums.stream()
    .filter(n -> n % 2 == 0)
    .map(n -> n * n)
    .toList();
// Khong co list trung gian

Với pipeline 10 stage trên 1 triệu element, imperative tạo 10 list trung gian — tốn 10 triệu object heap + GC pressure. Stream tạo đúng 1 list kết quả.

Ưu thế 2: stream vô hạn khả thi

// Tim 5 so chan dau tien
List<Integer> first5Even = Stream.iterate(0, i -> i + 1)
    .filter(i -> i % 2 == 0)
    .limit(5)
    .toList();
// [0, 2, 4, 6, 8]

Nếu eager, filter(Predicate) phải tạo list tất cả số chẵn trong infinite stream — chết máy. Lazy: filter chỉ chạy khi có yêu cầu element, limit dừng sau 5 element.

6. Element-by-element, không batch

Điểm phản trực giác tiếp theo: pipeline không chạy "filter cho tất cả, rồi map cho tất cả". Mỗi element đi từ đầu đến cuối pipeline trước khi element kế tiếp bắt đầu.

Stream.of(1, 2, 3)
    .filter(x -> { System.out.println("filter " + x); return x > 1; })
    .map(x -> { System.out.println("map " + x); return x * 10; })
    .toList();

Output:

filter 1
filter 2
map 2
filter 3
map 3

Phân tích:

  • Element 1 vào filter: log "filter 1", predicate false → bị loại. Không gọi map cho 1.
  • Element 2 vào filter: log "filter 2", predicate true → chuyển sang map: log "map 2".
  • Element 3 tương tự 2: log "filter 3", log "map 3".
flowchart LR
    A[1] -- filter: false --> X[bi loai]
    B[2] -- filter: true --> M2[map: 2 * 10 = 20]
    C[3] -- filter: true --> M3[map: 3 * 10 = 30]

Đây là tính chất quan trọng cho hiệu năng:

  • Không phí công: element bị filter loại không tốn map.
  • Short-circuit khả thi: terminal op có thể dừng giữa chừng (xem mục 7).
  • Stream vô hạn chạy được: element-by-element → không buffer hết.

Ngoại lệ: stateful intermediate

Một số intermediate op phải buffer element trước khi emit:

  • sorted() — phải thấy toàn bộ element để sort. Nếu stream vô hạn + sorted → hang forever.
  • distinct() — phải giữ set element đã thấy để loại trùng. Memory tăng với số distinct.
Stream.of(3, 1, 4, 1, 5, 9, 2, 6)
    .sorted()                // Tai day phai buffer het
    .limit(3)                // Roi moi limit
    .forEach(System.out::println);
// 1, 1, 2

limit(3) trước hay sau sorted()? Khác nhau hoàn toàn. Thứ tự này: sort hết, lấy 3 đầu. Đảo limit(3).sorted(): lấy 3 element đầu từ source (3, 1, 4), sort → (1, 3, 4). Tư duy theo pipeline thứ tự.

7. Short-circuit terminal operation

Terminal op short-circuit dừng ngay khi đủ kết quả, không duyệt hết stream:

  • findFirst(), findAny() — dừng khi tìm được 1 element.
  • anyMatch(predicate) — dừng khi thấy element true đầu tiên.
  • allMatch(predicate) — dừng khi thấy element false đầu tiên.
  • noneMatch(predicate) — tương tự.
  • limit(n) — dừng upstream khi đã emit n element.

Ví dụ dùng short-circuit trên stream vô hạn:

Optional<Integer> firstBig = Stream.iterate(0, i -> i + 1)
    .filter(i -> i > 100)
    .findFirst();
// Optional[101] - dung ngay khi tim duoc 101

Không có short-circuit → stream vô hạn chạy forever. Short-circuit là điều làm stream API "safe" với infinite source.

Tương tự:

boolean hasNegative = nums.stream().anyMatch(n -> n < 0);
// Stop ngay khi thay so am dau tien

Nếu nums có 1 triệu element và element thứ 3 đã âm, chỉ duyệt 3 element. Imperative loop tương đương:

boolean hasNegative = false;
for (int n : nums) {
    if (n < 0) { hasNegative = true; break; }
}

Stream version ngắn hơn, intent rõ hơn.

8. Stream bất biến — không dùng lại được

Đây là bẫy khác của người mới:

Stream<Integer> s = list.stream();

long count = s.count();              // OK, terminal op
s.forEach(System.out::println);      // IllegalStateException!
// stream has already been operated upon or closed

Hoặc:

Stream<Integer> s = list.stream().filter(n -> n > 0);
List<Integer> a = s.toList();
List<Integer> b = s.toList();        // IllegalStateException

Mỗi stream dùng 1 lần. Có terminal op → consume → closed.

Muốn tái dùng logic, tạo function trả stream:

Supplier<Stream<Integer>> filtered = () -> list.stream().filter(n -> n > 0);

List<Integer> a = filtered.get().toList();
List<Integer> b = filtered.get().toList();   // OK - stream moi

Hoặc collect thành List rồi dùng lại list đó.

9. Pitfall tổng hợp

Nhầm 1: Reuse stream.

Stream<Integer> s = list.stream();
s.count();
s.forEach(System.out::println);   // IllegalStateException

✅ Tạo stream mới mỗi lần hoặc collect thành List:

list.stream().count();
list.stream().forEach(System.out::println);

Nhầm 2: Quên terminal op.

list.stream().filter(x -> x > 10);   // Khong chay gi

✅ Thêm terminal: .toList(), .forEach(...), .count().

Nhầm 3: Dùng peek cho logic chính.

list.stream().peek(x -> save(x)).count();   // peek co the bi skip o Java 9+

peek là intermediate, chỉ dùng debug/log. Dùng forEach cho side-effect chính:

list.stream().forEach(this::save);

Nhầm 4: Modify collection gốc trong lambda stream.

list.stream().forEach(x -> list.add(x * 2));   // ConcurrentModificationException

✅ Collect thành list mới:

List<Integer> doubled = list.stream().map(x -> x * 2).toList();

Nhầm 5: sorted() trên stream vô hạn.

Stream.iterate(0, i -> i + 1).sorted().limit(5);   // Hang forever

sorted() cần thấy hết element. Dùng limit TRƯỚC sorted nếu phù hợp, hoặc đổi source.

10. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Package-summary giải thích khái niệm operation characteristics — mỗi op có tag SIZED, ORDERED, DISTINCT, SORTED. JVM dùng các tag này để optimize. Ví dụ count() trên stream SIZED + không có filter → JVM trả thẳng size() không duyệt. Đó là lý do một số bài benchmark thấy count() nhanh bất thường.

11. Tóm tắt

  • Stream là pipeline mô tả xử lý, không phải cấu trúc lưu trữ. Không giữ element, không size() (với stream vô hạn), chỉ duyệt 1 lần.
  • Intermediate op trả Stream<T>, lazy — chỉ ghi lại ý định vào pipeline.
  • Terminal op trả kết quả khác stream, kích hoạt pipeline chạy từ source lên.
  • Lazy evaluation cho phép: pipeline dài không tạo collection trung gian, stream vô hạn khả thi, short-circuit tiết kiệm công.
  • Element đi từng cái qua toàn pipeline (filter → map → ...) trước khi element kế bắt đầu. Không batch.
  • Stateful intermediate (sorted, distinct) phải buffer — không dùng với stream vô hạn.
  • Short-circuit terminal (findFirst, anyMatch, limit) dừng khi đủ, làm stream vô hạn an toàn.
  • Stream one-shot — consume xong là closed. Cần reuse → tạo stream mới từ source hoặc collect thành list.
  • Tạo stream từ: collection, array, factory, generator (iterate, generate), file. Primitive stream (IntStream, ...) tránh boxing.

12. Tự kiểm tra

Tự kiểm tra
Q1
Đoạn sau in gì? Stream.of(1, 2, 3).filter(n -> n > 1);

Không in gì, không làm gì.

Không có terminal operation → pipeline lazy không chạy. Filter chỉ được "ghi lại ý định" vào pipeline stage, nhưng không ai yêu cầu element → predicate không chạy.

Thêm terminal op (.toList(), .forEach(...), .count()) mới kích hoạt. Đây là lỗi thường gặp nhất của người mới Stream — nghĩ filter luôn chạy như Python/JavaScript.

Q2
Vì sao Stream chỉ duyệt được 1 lần, khác với List?

Stream không lưu element — chỉ giữ con trỏ tới source (collection, file, network, generator). Khi terminal op chạy, nó consume source qua Spliterator. Consume xong, stream mark closed.

Lý do thiết kế: stream có thể từ nguồn không rewind được — file I/O đọc forward, socket read một lần, generator sinh lazy. Không thể design API "duyệt lại" cho tất cả source. Java chọn đơn giản và nhất quán: mọi stream one-shot.

Muốn duyệt lại → tạo stream mới từ source (list.stream() rẻ) hoặc collect thành List rồi dùng.

Q3
Pipeline list.stream().filter(...).map(...).toList() có duyệt list 2 lần không?

Không — chỉ 1 lần. Mỗi element đi qua filter → (nếu qua) map → được add vào result list. Rồi element kế tiếp bắt đầu.

Đây là lợi thế lớn so với viết imperative thủ công: imperative 2 pass phải tạo list trung gian sau filter, rồi duyệt lại list đó để map. Stream không tạo trung gian.

Với pipeline 5 stage trên 1 triệu element: imperative tạo 5 list (5 triệu object), stream tạo 1 list kết quả. Khác biệt memory rất lớn, đặc biệt với GC pressure.

Q4
Đoạn sau in gì theo thứ tự nào? Stream.of(1, 2, 3).filter(n -> { System.out.println("f" + n); return n > 1; }).map(n -> { System.out.println("m" + n); return n * 10; }).toList();

Element đi từng cái qua toàn pipeline:

f1     // element 1 vao filter
f2     // element 2 vao filter
m2     // 2 qua filter, chay map
f3     // element 3 vao filter
m3     // 3 qua filter, chay map

Element 1 bị filter loại → không gọi map cho 1. Nếu pipeline chạy batch ("filter hết trước, map hết sau") sẽ là f1 f2 f3 m2 m3 — khác hẳn.

Trật tự element-by-element là lý do stream vô hạn và short-circuit hoạt động được.

Q5
Khi nào nên dùng Stream.iterate vs IntStream.range?
  • IntStream.range(0, n): khi cần int sequential từ a đến b. Nhanh nhất, không boxing, không overhead UnaryOperator.
  • Stream.iterate(seed, fn): khi cần sinh theo công thức, vd iterate(1, n -> n * 2) → 1, 2, 4, 8, 16... Cần limit() để dừng (hoặc dùng dạng 3-arg Java 9+ có predicate).
  • Stream.iterate 3-arg (Java 9+): iterate(0, i -> i < 100, i -> i + 1) — giống for loop, không cần limit.

Rule nhanh: int range đơn giản → IntStream.range. Công thức phức tạp → Stream.iterate. Stream random/fake data → Stream.generate.

Q6
Khác biệt giữa peek và forEach là gì? Khi nào dùng peek?
  • peek(Consumer): intermediate, trả Stream<T>. Không kích hoạt pipeline. Dùng để quan sát element mid-pipeline.
  • forEach(Consumer): terminal, trả void. Kích hoạt pipeline. Side-effect trên mỗi element.

Dùng peek chỉ để debug/logging:

stream
  .peek(x -> System.out.println("before filter: " + x))
  .filter(x -> x > 0)
  .peek(x -> System.out.println("after filter: " + x))
  .toList();

Không dùng peek cho logic chính. Java 9+ có thể skip peek khi optimizer nhận thấy terminal op không cần duyệt (vd count() trên stream SIZED). Logic side-effect phải ở terminal forEach.

Bài tiếp theo: map / filter / reduce — ba operation cốt lõi

Bài này có giúp bạn hiểu bản chất không?