map / filter / reduce — ba operation cốt lõi
Ba operation nền tảng của functional programming: filter (lọc), map (biến đổi), reduce (gộp) — kèm ràng buộc associativity quyết định parallel đúng hay sai. Viết 80% task aggregate mà không cần loop.
TL;DR: Ba operation filter (giữ element thoả điều kiện), map (biến đổi, có thể đổi kiểu), reduce (gộp nhiều giá trị về một) giải quyết ~80% task aggregate dữ liệu mà không cần vòng for. mapToInt/Long/Double chuyển sang primitive stream, tránh boxing với dữ liệu lớn. reduce có 3 form: có identity (trả thẳng giá trị), không identity (trả Optional), và form 3 tham số với combiner cho parallel. Ràng buộc quan trọng nhất: hàm reduce phải có tính chất kết hợp (associative) — vi phạm thì sequential vẫn đúng nhưng parallel cho kết quả sai không ổn định. Khi kết quả cần là collection/map thay vì một giá trị, dùng collect(Collector) — có bài riêng.
Bạn có List<Order> gồm 10 nghìn đơn hàng. Sếp hỏi: "tổng doanh thu các đơn đã thanh toán là bao nhiêu?"
Cách 1 — imperative (Java trước 8, hoặc Python bạn quen):
double total = 0;
for (Order o : orders) {
if (o.status().equals("PAID")) {
total += o.amount();
}
}
Đọc đoạn này bạn phải làm 3 bước: (1) thấy vòng for → hiểu đang duyệt, (2) thấy if → hiểu đang lọc, (3) thấy += → hiểu đang cộng. Logic đơn giản nhưng intent nằm trong 3 chỗ khác nhau.
Cách 2 — functional với stream:
double total = orders.stream()
.filter(o -> o.status().equals("PAID"))
.mapToDouble(Order::amount)
.sum();
Đọc từ trên xuống: "orders → filter PAID → map ra amount → sum". 1 câu, đúng thứ tự tư duy. Không cần mental-execute vòng for, không cần track biến total.
Đây là sức mạnh của ba operation filter, map, reduce — chúng giải quyết ~80% task aggregate: đếm, tính tổng, lọc, transform. Phần gom dữ liệu thành Map và group nhiều tầng do Collectors đảm nhiệm — chủ đề đó có bài riêng.
Bài này đi sâu vào từng operation, giải thích cơ chế và ràng buộc associativity — nền móng để bài Collectors và bài Parallel stream về sau đứng vững.
1. Analogy — Dây chuyền phân loại bưu kiện
Trung tâm phân loại bưu kiện:
- Trạm 1 — QC: loại bưu kiện hỏng, chỉ cho tốt đi tiếp. → filter.
- Trạm 2 — đổi nhãn: giữ bưu kiện, nhưng thay nhãn ghi "điểm đến mới". Bưu kiện vẫn chạy, nội dung khác. → map.
- Trạm 3 — cuối chuyền: gộp tất cả thành 1 con số (tổng khối lượng, tổng số đơn). → reduce / sum / count.
- Trạm 4 — phân vào thùng: phân loại bưu kiện vào nhiều thùng theo tỉnh/thành. → collect với
Collectors.groupingBy(bài 07).
Mỗi trạm có 1 vai trò rõ ràng. Không lẫn. Bưu kiện đi qua từng trạm theo thứ tự. Cuối ngày, quản lý đọc báo cáo từ trạm 3 hoặc 4, không cần xem từng bưu kiện.
| Đời thường | Stream op |
|---|---|
| QC loại đồ hỏng | filter(Predicate) |
| Đổi nhãn | map(Function) |
| Báo cáo tổng | reduce / sum / count |
| Phân thùng | collect(Collectors.xxx) — bài 07 |
Triết lý functional programming: mô tả WHAT, không WHERE HOW. Bạn không quan tâm bưu kiện 3 đang ở trạm 2 hay chưa — chỉ quan tâm cuối chuyền có đúng kết quả. Stream API biến Java từ "liệt kê bước" sang "mô tả pipeline".
filter = giữ thứ thoả điều kiện. map = biến thứ này thành thứ khác. reduce = gộp nhiều thứ thành một. Đọc pipeline theo intent này, sẽ không cần suy nghĩ về loop.
2. filter(Predicate<T>) — lọc
Giữ element thoả predicate, loại những element khác.
List<Integer> evens = Stream.of(1, 2, 3, 4, 5)
.filter(n -> n % 2 == 0)
.toList();
// [2, 4]
Predicate là boolean test(T t) — nhận element, trả true/false. True = giữ, false = loại.
Predicate combinators
Predicate có default method cho compose:
Predicate<String> notEmpty = s -> !s.isEmpty();
Predicate<String> shortEnough = s -> s.length() < 5;
// AND
Predicate<String> both = notEmpty.and(shortEnough);
// OR
Predicate<String> either = notEmpty.or(shortEnough);
// NOT
Predicate<String> isEmpty = notEmpty.negate();
// Static not (Java 11+)
Predicate<String> notEmpty2 = Predicate.not(String::isEmpty);
Tiện khi reuse logic — tránh viết điều kiện phức tạp inline.
Ví dụ thực tế: lọc user active đủ tuổi
record User(String name, int age, boolean active) { }
List<User> eligible = users.stream()
.filter(u -> u.active())
.filter(u -> u.age() >= 18)
.toList();
Hai filter riêng rẽ đọc rõ intent, performance giống 1 filter với điều kiện &&. Stream lazy — mỗi element đi qua filter 1, nếu pass → filter 2. Không batch.
3. map(Function<T, R>) — biến đổi
Transform element. Kiểu output có thể khác kiểu input:
List<Integer> lengths = Stream.of("a", "hello", "world!")
.map(String::length)
.toList();
// [1, 5, 6]
// Stream<String> -> Stream<Integer>
mapToInt/Long/Double — primitive stream
Khi target kiểu primitive, dùng biến thể mapToInt để tránh auto-boxing:
int totalLength = Stream.of("a", "hello", "world")
.mapToInt(String::length)
.sum();
// 11
mapToInt trả IntStream — stream primitive. Có method đặc biệt:
sum()— tổng (trảint, khôngOptional).average()— trung bình (trảOptionalDoublevì stream rỗng chia 0).min(),max()— trảOptionalInt.summaryStatistics()— trảIntSummaryStatisticscó count/sum/min/max/avg đầy đủ trong 1 pass.
IntSummaryStatistics stats = users.stream()
.mapToInt(User::age)
.summaryStatistics();
System.out.println(stats.getAverage()); // avg age
System.out.println(stats.getMax()); // max age
System.out.println(stats.getCount()); // so user
Quay lại Stream<Integer> (boxed) bằng .boxed() nếu cần chain thêm operation yêu cầu reference type:
List<Integer> lengths = Stream.of("a", "hello")
.mapToInt(String::length)
.boxed()
.toList();
Khi nào dùng primitive stream?
Với stream lớn (vượt 1000 element) và op aggregate số (sum, average, min, max), primitive stream nhanh hơn đáng kể vì bỏ Integer.valueOf() và Integer.intValue() boxing. Với stream nhỏ, khác biệt không đáng kể — ưu tiên đọc code dễ.
4. reduce — gộp về 1 giá trị
reduce là tổng quát hoá của sum, product, max, min, concat. Bạn định nghĩa "cách gộp 2 giá trị", reduce áp dụng dồn qua toàn stream.
Form 1: có identity
int sum = Stream.of(1, 2, 3, 4).reduce(0, (a, b) -> a + b);
// 10
0 là identity — giá trị trung tính với operation. Với cộng là 0, nhân là 1, concat chuỗi là "".
Identity có 2 vai trò:
- Starting value: reduce bắt đầu với identity rồi combine từng element.
- Kết quả khi stream rỗng: stream rỗng → reduce trả identity ngay.
int empty = Stream.<Integer>empty().reduce(0, Integer::sum);
// 0 - stream rong tra identity
Không phải Optional vì có identity bảo lãnh rằng luôn có giá trị trả về.
Form 2: không identity → Optional
Optional<Integer> max = Stream.of(3, 1, 4, 1, 5).reduce(Integer::max);
// Optional[5]
Optional<Integer> emptyMax = Stream.<Integer>empty().reduce(Integer::max);
// Optional.empty
Không có identity → stream rỗng trả Optional.empty(). Khi nào dùng form này? Khi bạn không muốn đặt identity giả — vd max không có "identity" tự nhiên (không phải Integer.MIN_VALUE vì đó là giá trị hợp lệ có thể là max thật).
Form 3: accumulator khác kiểu + combiner
int totalLength = Stream.of("a", "hello", "world").reduce(
0, // identity (int)
(sum, s) -> sum + s.length(), // accumulator: int + String -> int
Integer::sum // combiner: dung cho parallel
);
// 11
Form 3 dùng khi kiểu tích luỹ khác kiểu element. combiner chỉ chạy trong parallel stream — gộp 2 kết quả chunk lại. Sequential stream không gọi combiner.
Form này hiếm viết tay. Thường mapToInt(String::length).sum() tiện hơn. Nhưng hiểu form 3 cần thiết khi đọc code parallel stream ai đó viết.
Ràng buộc toán học bắt buộc: Tính chất kết hợp (Associative)
Khi thực hiện thao tác gộp dữ liệu bằng reduce, hàm tích lũy (accumulator) và hàm kết hợp (combiner) bắt buộc phải tuân thủ tính chất toán học: Tính chất kết hợp (Associative Property).
Công thức tổng quát của tính chất kết hợp:
f(f(a, b), c) == f(a, f(b, c))
- Thỏa mãn tính chất kết hợp: Phép cộng
(a + b), phép nhân(a * b), tìm giá trị lớn nhất/nhỏ nhấtMath.max(a, b)/Math.min(a, b), phép nối chuỗi (String concatenation), hay phép hợp tập hợp (Union set). Ví dụ:(1 + 2) + 3 == 1 + (2 + 3). - Không thỏa mãn tính chất kết hợp: Phép trừ
(a - b), phép chia(a / b). Ví dụ:(10 - 1) - 2 = 7, trong khi10 - (1 - 2) = 11. Hai kết quả hoàn toàn khác nhau.
Hệ quả trên Parallel Stream
Đối với sequential stream (duyệt tuần tự), dữ liệu [a, b, c, d] luôn được gộp tuyến tính từ trái qua phải f(f(f(a, b), c), d), do đó ngay cả các hàm không có tính chất kết hợp vẫn cho ra một kết quả duy nhất (deterministic).
Tuy nhiên, khi chạy trên Parallel Stream, luồng dữ liệu sẽ được chia nhỏ thành nhiều phần độc lập chạy trên nhiều Thread khác nhau của hệ thống ForkJoinPool, sau đó kết quả của từng thread được gộp lại với nhau bằng combiner. Do ForkJoinPool chia cắt và gộp các luồng con theo cách không dự đoán trước được (non-deterministic), việc sử dụng một hàm accumulator không có tính chất kết hợp sẽ dẫn đến kết quả sai lệch và không ổn định qua các lần chạy khác nhau.
Không dùng các phép toán không có tính chất kết hợp (như phép trừ (a, b) -> a - b) trong reduce. Khi stream được chuyển sang chế độ song song bằng .parallel(), kết quả sẽ sai lệch và tạo lỗi ngầm (silent bug) khó dò vết ở môi trường production.
Sơ đồ so sánh: Parallel Reduce vs Sequential Accumulate
Để hiểu trực quan tại sao identity bắt buộc phải là một phần tử trung tính (neutral element) của phép toán (như 0 trong phép cộng, 1 trong phép nhân) và cách hoạt động của hai mô hình, hãy quan sát sơ đồ dưới đây:
1. Sequential Accumulate (Duyệt tuyến tính tuần tự):
Identity (0) ────┐
├──> (+) ────┐
Phần tử (1) ────┘ ├──> (+) ────┐
Phần tử (2) ─────────────────┘ ├──> (+) ──> Kết quả cuối cùng: 6
Phần tử (3) ──────────────────────────────┘
2. Parallel Reduce (Cây nhị phân thu gọn - Binary Reduction Tree):
[ ForkJoinPool chia cắt các luồng con ]
/ \
[Luồng A] [Luồng B]
/ \ / \
Identity Phần tử 1 Identity Phần tử 2
(0) (1) (0) (2)
\ / \ /
▼ ▼ ▼ ▼
[ Accumulate ] [ Accumulate ]
(1) (2) Phần tử 3
\ / (3)
▼ ▼ /
[ Combiner: gộp luồng A & B ] /
(3) /
\ /
▼ ▼
[ Kết quả cuối cùng ]
(6)
Phân tích vai trò của Identity:
Trong sơ đồ Parallel Reduce trên, ta thấy identity (0) được nạp vào đầu mỗi nhánh luồng con (Luồng A và Luồng B) để làm giá trị khởi tạo ban đầu.
Nếu identity không phải là phần tử trung tính (ví dụ: dùng identity = 1 cho phép cộng), thì mỗi luồng con sẽ tự cộng thêm 1 vào kết quả cục bộ của mình. Khi combiner gộp các nhánh lại với nhau, giá trị sai lệch sẽ bị nhân lên theo số lượng luồng con được phân tách, dẫn đến kết quả sai hoàn toàn (trong ví dụ trên sẽ ra 8 thay vì 6 do bị dư thừa 1 ở mỗi luồng con).
Nếu task đơn giản (sum, count, max, join), dùng API đặc biệt (mapToInt.sum(), count(), max(Comparator), Collectors.joining) — đọc rõ hơn, ít bug. Chỉ dùng reduce thô khi không có API nào phù hợp.
5. Collectors — bước kế tiếp của reduce
reduce gộp stream về một giá trị. Khi kết quả cần là container — List, Set, Map, group nhiều tầng — Java dùng collect(Collector), một dạng "mutable reduction": mỗi thread tích luỹ vào container riêng rồi combine cuối. Bộ factory Collectors (toList, toMap, groupingBy, partitioningBy, joining) đủ mạnh để thay thế hầu hết vòng for lồng khi làm báo cáo.
Vì Collectors là chủ đề sâu (merge function khi trùng key, downstream collector lồng nhiều tầng, custom Collector.of), nó có bài riêng: Collectors deep dive. Ở bài này bạn chỉ cần nắm: collect là terminal op, nhận một Collector, và là "anh em mutable" của reduce.
6. Pattern thực tế — tổng doanh thu
Task: cho List<Sale> (region, product, amount), tính tổng toàn bộ:
record Sale(String region, String product, double amount) { }
List<Sale> sales = List.of(
new Sale("VN", "Pen", 100),
new Sale("VN", "Book", 300),
new Sale("US", "Laptop", 5000)
);
// Tong amount toan bo - filter/map/reduce thuan
double total = sales.stream()
.mapToDouble(Sale::amount)
.sum();
// 5400.0
Các báo cáo group nhiều tầng (tổng per region, count per region + product, top product mỗi region) cần Collectors.groupingBy với downstream collector — xem ví dụ đầy đủ trong bài Collectors và mini-challenge cuối module.
7. Pitfall tổng hợp
❌ Nhầm 1: mapToInt().boxed().mapToInt() unnecessarily.
int sum = list.stream().mapToInt(Integer::intValue).boxed().mapToInt(x -> x).sum();
✅ Bỏ .boxed().mapToInt(...) — đã là IntStream, gọi .sum() luôn.
❌ Nhầm 2: reduce cho task có API chuyên.
int count = (int) list.stream().reduce(0, (a, b) -> a + 1, Integer::sum);
✅ list.stream().count() trả long.
❌ Nhầm 3: reduce với hàm không associative + parallel.
int diff = list.parallelStream().reduce(0, (a, b) -> a - b);
// Ket qua khong deterministic
✅ Kiểm tra associative trước khi parallel. Với task không associative, giữ sequential.
❌ Nhầm 4: Filter rồi count thay vì count predicate.
// Khong co API count(predicate) - phai filter truoc
long evenCount = list.stream().filter(n -> n % 2 == 0).count();
✅ Code trên đúng. Java stream không có count(predicate) — dùng filter().count().
8. 📚 Deep Dive Oracle
Spec / reference chính thức:
- Stream.reduce javadoc — 3 form reduce chi tiết, yêu cầu associative + stateless + non-interfering.
- Reduction operations tutorial — Oracle tutorial so sánh reduce vs collect.
- java.util.stream package-summary — Reduction — spec chính thức về reduction, associativity, và vì sao mutable reduction (collect) tồn tại song song.
Ghi chú: Package-summary định nghĩa chính xác 3 ràng buộc của hàm reduce: associative (kết hợp), non-interfering (không sửa source), stateless (không phụ thuộc state ngoài). Sequential stream "tha thứ" cho vi phạm associative (vẫn deterministic), nhưng parallel thì không — đây là lý do bug reduce thường chỉ lộ ra khi ai đó thêm .parallel() về sau.
Liên kết khoá học khác
- SQL — bài 3.5 Aggregate Functions —
SUM/AVG/COUNT/MAX/MINtrong SQL tương ứngStream.reduce()vàCollectors.summarizing*()trong Java. - SQL — bài 3.4 GROUP BY và HAVING —
GROUP BYtương ứngCollectors.groupingBy()(bài 07), lý do nên aggregate ở DB thay vì kéo data về app.
9. Tóm tắt
filter(Predicate)— giữ element thoả điều kiện. Predicate có default methodand,or,negateđể compose.map(Function)— biến đổi, có thể đổi kiểu.mapToInt/Long/Doublecho primitive stream tránh boxing.- Primitive stream có
sum,average,min,max,summaryStatisticsbuilt-in — nhanh hơn với data lớn. reduce3 form: có identity (không Optional), không identity (trả Optional), có combiner (parallel). Identity phải là phần tử trung tính của phép toán.- Hàm reduce phải associative — sequential tha thứ vi phạm, parallel thì không.
- Ưu tiên API chuyên (
sum,count,max) hơnreducethô — dễ đọc, ít bug. - Khi kết quả là collection/map →
collect(Collector)— đào sâu ở bài 07 Collectors.
10. Tự kiểm tra
Q1Đoạn sau cho kết quả gì? Stream.of(1, 2, 3, 4).filter(n -> n > 1).mapToInt(n -> n * 2).sum()▸
Stream.of(1, 2, 3, 4).filter(n -> n > 1).mapToInt(n -> n * 2).sum()Filter giữ 2, 3, 4 (bỏ 1). Map * 2 → 4, 6, 8. Sum → 18.
18Đọc pipeline theo trật tự: filter → mapToInt → sum. Mỗi bước thu hẹp hoặc biến đổi stream đến khi terminal op tính tổng. Stream mapToInt chuyển sang IntStream — có sum() built-in.
Q2Khi nào reduce không hoạt động đúng với parallel stream?▸
Khi hàm reduce không associative: f(f(a, b), c) ≠ f(a, f(b, c)).
Sequential stream luôn chạy left-to-right deterministic. Parallel chia data thành chunk, reduce song song, rồi combine theo thứ tự không xác định. Nếu hàm không associative, kết quả phụ thuộc JVM chia chunk thế nào — non-deterministic.
Ví dụ trừ: (a, b) -> a - b. Sequential ((10-1)-2)-3 = 4. Parallel có thể chia [(10-1), (2-3)], combine 9 - (-1) = 10. Khác nhau.
An toàn cho parallel: cộng, nhân, max, min, union set, string concat (với StringBuilder collector). Tránh: trừ, chia, bất cứ op nào phụ thuộc thứ tự.
Q3Điều gì xảy ra nếu dùng parallelStream().reduce(1, Integer::sum) (identity = 1) để tính tổng? Vì sao identity phải là phần tử trung tính?▸
parallelStream().reduce(1, Integer::sum) (identity = 1) để tính tổng? Vì sao identity phải là phần tử trung tính?Kết quả sẽ sai và không ổn định. Parallel reduce nạp identity vào đầu mỗi chunk: nếu data bị chia thành 4 chunk, mỗi chunk tự cộng thêm 1 → kết quả dư đúng bằng số chunk (mà số chunk do ForkJoinPool quyết định runtime, thay đổi giữa các lần chạy).
Identity trung tính nghĩa là f(identity, x) == x với mọi x — cộng là 0, nhân là 1, concat là chuỗi rỗng. Khi đó việc nạp identity vào bao nhiêu chunk cũng không làm lệch kết quả.
Sequential với identity = 1 cũng sai (kết quả luôn dư 1), nhưng sai ổn định nên dễ phát hiện trong test. Parallel sai thay đổi theo lần chạy — khó dò hơn nhiều.
Q4Vì sao stream.reduce(Integer::max) (không identity) trả Optional<Integer> thay vì Integer?▸
stream.reduce(Integer::max) (không identity) trả Optional<Integer> thay vì Integer?Vì stream có thể rỗng. Form có identity luôn có giá trị fallback (stream rỗng → trả identity), nên trả thẳng T. Form không identity không có gì để trả khi stream rỗng → API buộc phải biểu diễn "có thể vắng mặt" bằng Optional.
Với max, đây là thiết kế đúng: không tồn tại identity tự nhiên cho phép max. Dùng Integer.MIN_VALUE làm identity là sai ngữ nghĩa — caller không phân biệt được "stream rỗng" với "max thật sự là MIN_VALUE".
Pattern caller: stream.reduce(Integer::max).orElseThrow() nếu chắc chắn không rỗng, hoặc orElse(defaultValue) nếu có giá trị mặc định hợp lý. Optional sẽ được đào sâu ở bài 06.
Q5Khi nào dùng reduce, khi nào dùng collect?▸
- reduce: gộp stream về single value cùng kiểu element — sum, product, max, min, string concat. Hàm reducer phải stateless + associative.
- collect: build mutable container — List, Set, Map, StringBuilder. Dùng
Collectors.*cho common case.
Quick rule:
- Output là số/primitive → reduce hoặc API chuyên (
mapToInt.sum(),count()). - Output là collection/map → collect.
Technical: reduce là "functional reduce" (pure function), collect là "mutable reduction" (build container bằng mutation — an toàn vì mỗi thread có container riêng, combine cuối).
Bài tiếp theo: Stream nâng cao — flatMap, takeWhile và peek
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên