Java — Từ Zero đến Senior/map / filter / reduce — ba operation cốt lõi
~25 phútStream API & LambdaMiễn phí

map / filter / reduce — ba operation cốt lõi

Ba operation nền tảng của functional programming: filter (lọc), map (biến đổi), reduce (gộp). Collectors.toList/toMap/groupingBy/partitioningBy. Viết 80% task aggregate mà không cần loop.

Bạn có List<Order> gồm 10 nghìn đơn hàng. Sếp hỏi: "tổng doanh thu các đơn đã thanh toán là bao nhiêu?"

Cách 1 — imperative (Java trước 8, hoặc Python bạn quen):

double total = 0;
for (Order o : orders) {
    if (o.status().equals("PAID")) {
        total += o.amount();
    }
}

Đọc đoạn này bạn phải làm 3 bước: (1) thấy vòng for → hiểu đang duyệt, (2) thấy if → hiểu đang lọc, (3) thấy += → hiểu đang cộng. Logic đơn giản nhưng intent nằm trong 3 chỗ khác nhau.

Cách 2 — functional với stream:

double total = orders.stream()
    .filter(o -> o.status().equals("PAID"))
    .mapToDouble(Order::amount)
    .sum();

Đọc từ trên xuống: "orders → filter PAID → map ra amount → sum". 1 câu, đúng thứ tự tư duy. Không cần mental-execute vòng for, không cần track biến total.

Đây là sức mạnh của ba operation filter, map, reduce. Chúng giải quyết được ~80% task aggregate dữ liệu: đếm, tính tổng, lọc, transform, group. Kết hợp với Collectors (terminal op mạnh nhất: toList, toMap, groupingBy, partitioningBy), bạn viết được aggregate phức tạp trong 5-10 dòng thay vì 30-50 dòng imperative.

Bài này đi sâu vào từng operation, giải thích cơ chế, show ví dụ thực tế, và pattern Collectors.groupingBy lồng — cách aggregate 2-3 tầng phổ biến trong báo cáo business.

1. Analogy — Dây chuyền phân loại bưu kiện

Trung tâm phân loại bưu kiện:

  • Trạm 1 — QC: loại bưu kiện hỏng, chỉ cho tốt đi tiếp. → filter.
  • Trạm 2 — đổi nhãn: giữ bưu kiện, nhưng thay nhãn ghi "điểm đến mới". Bưu kiện vẫn chạy, nội dung khác. → map.
  • Trạm 3 — cuối chuyền: gộp tất cả thành 1 con số (tổng khối lượng, tổng số đơn) hoặc 1 báo cáo. → reduce / sum / count.
  • Trạm 4 — phân vào thùng: phân loại bưu kiện vào nhiều thùng theo tỉnh/thành. → collect với Collectors.groupingBy.

Mỗi trạm có 1 vai trò rõ ràng. Không lẫn. Bưu kiện đi qua từng trạm theo thứ tự. Cuối ngày, quản lý đọc báo cáo từ trạm 3 hoặc 4, không cần xem từng bưu kiện.

Đời thườngStream op
QC loại đồ hỏngfilter(Predicate)
Đổi nhãnmap(Function)
Báo cáo tổngreduce / sum / count
Phân thùngcollect(Collectors.xxx)

Triết lý functional programming: mô tả WHAT, không WHERE HOW. Bạn không quan tâm bưu kiện 3 đang ở trạm 2 hay chưa — chỉ quan tâm cuối chuyền có đúng kết quả. Stream API biến Java từ "liệt kê bước" sang "mô tả pipeline".

💡 Cách nhớ intent ba op

filter = giữ thứ thoả điều kiện. map = biến thứ này thành thứ khác. reduce = gộp nhiều thứ thành một. Đọc pipeline theo intent này, sẽ không cần suy nghĩ về loop.

2. filter(Predicate<T>) — lọc

Giữ element thoả predicate, loại những element khác.

List<Integer> evens = Stream.of(1, 2, 3, 4, 5)
    .filter(n -> n % 2 == 0)
    .toList();
// [2, 4]

Predicate là boolean test(T t) — nhận element, trả true/false. True = giữ, false = loại.

Predicate combinators

Predicate có default method cho compose:

Predicate<String> notEmpty = s -> !s.isEmpty();
Predicate<String> shortEnough = s -> s.length() < 5;

// AND
Predicate<String> both = notEmpty.and(shortEnough);

// OR
Predicate<String> either = notEmpty.or(shortEnough);

// NOT
Predicate<String> isEmpty = notEmpty.negate();

// Static not (Java 11+)
Predicate<String> notEmpty2 = Predicate.not(String::isEmpty);

Tiện khi reuse logic — tránh viết điều kiện phức tạp inline.

Ví dụ thực tế: lọc user active đủ tuổi

record User(String name, int age, boolean active) { }

List<User> eligible = users.stream()
    .filter(u -> u.active())
    .filter(u -> u.age() >= 18)
    .toList();

Hai filter riêng rẽ đọc rõ intent, performance giống 1 filter với điều kiện &&. Stream lazy — mỗi element đi qua filter 1, nếu pass → filter 2. Không batch.

3. map(Function<T, R>) — biến đổi

Transform element. Kiểu output có thể khác kiểu input:

List<Integer> lengths = Stream.of("a", "hello", "world!")
    .map(String::length)
    .toList();
// [1, 5, 6]
// Stream<String> -> Stream<Integer>

mapToInt/Long/Double — primitive stream

Khi target kiểu primitive, dùng biến thể mapToInt để tránh auto-boxing:

int totalLength = Stream.of("a", "hello", "world")
    .mapToInt(String::length)
    .sum();
// 11

mapToInt trả IntStream — stream primitive. Có method đặc biệt:

  • sum() — tổng (trả int, không Optional).
  • average() — trung bình (trả OptionalDouble vì stream rỗng chia 0).
  • min(), max() — trả OptionalInt.
  • summaryStatistics() — trả IntSummaryStatistics có count/sum/min/max/avg đầy đủ trong 1 pass.
IntSummaryStatistics stats = users.stream()
    .mapToInt(User::age)
    .summaryStatistics();

System.out.println(stats.getAverage());   // avg age
System.out.println(stats.getMax());       // max age
System.out.println(stats.getCount());     // so user

Quay lại Stream<Integer> (boxed) bằng .boxed() nếu cần chain thêm operation yêu cầu reference type:

List<Integer> lengths = Stream.of("a", "hello")
    .mapToInt(String::length)
    .boxed()
    .toList();

Khi nào dùng primitive stream?

Với stream lớn (>1000 element) và op aggregate số (sum, average, min, max), primitive stream nhanh hơn đáng kể vì bỏ Integer.valueOf()Integer.intValue() boxing. Với stream nhỏ, khác biệt không đáng kể — ưu tiên đọc code dễ.

4. reduce — gộp về 1 giá trị

reduce là tổng quát hoá của sum, product, max, min, concat. Bạn định nghĩa "cách gộp 2 giá trị", reduce áp dụng dồn qua toàn stream.

Form 1: có identity

int sum = Stream.of(1, 2, 3, 4).reduce(0, (a, b) -> a + b);
// 10

0identity — giá trị trung tính với operation. Với cộng là 0, nhân là 1, concat chuỗi là "".

Identity có 2 vai trò:

  1. Starting value: reduce bắt đầu với identity rồi combine từng element.
  2. Kết quả khi stream rỗng: stream rỗng → reduce trả identity ngay.
int empty = Stream.<Integer>empty().reduce(0, Integer::sum);
// 0 - stream rong tra identity

Không phải Optional vì có identity bảo lãnh rằng luôn có giá trị trả về.

Form 2: không identity → Optional

Optional<Integer> max = Stream.of(3, 1, 4, 1, 5).reduce(Integer::max);
// Optional[5]

Optional<Integer> emptyMax = Stream.<Integer>empty().reduce(Integer::max);
// Optional.empty

Không có identity → stream rỗng trả Optional.empty(). Khi nào dùng form này? Khi bạn không muốn đặt identity giả — vd max không có "identity" tự nhiên (không phải Integer.MIN_VALUE vì đó là giá trị hợp lệ có thể là max thật).

Form 3: accumulator khác kiểu + combiner

int totalLength = Stream.of("a", "hello", "world").reduce(
    0,                               // identity (int)
    (sum, s) -> sum + s.length(),    // accumulator: int + String -> int
    Integer::sum                      // combiner: dung cho parallel
);
// 11

Form 3 dùng khi kiểu tích luỹ khác kiểu element. combiner chỉ chạy trong parallel stream — gộp 2 kết quả chunk lại. Sequential stream không gọi combiner.

Form này hiếm viết tay. Thường mapToInt(String::length).sum() tiện hơn. Nhưng hiểu form 3 cần thiết khi đọc code parallel stream ai đó viết.

Rule quan trọng: hàm reduce phải associative

Hàm f(a, b) phải thoả: f(f(a, b), c) == f(a, f(b, c)).

  • Associative: cộng, nhân, max, min, string concat, union set. (1+2)+3 == 1+(2+3).
  • Không associative: trừ, chia. (10-1)-2 = 7, 10-(1-2) = 11. Khác.

Tại sao quan trọng? Với sequential stream [a, b, c, d], reduce chạy f(f(f(a, b), c), d) — luôn left-to-right. Không associative vẫn deterministic.

Nhưng với parallel stream, Java chia data thành chunk, reduce song song rồi combine. Thứ tự combine không xác định — hàm không associative → kết quả random.

// Sai voi parallel
int diff = stream.parallel().reduce(0, (a, b) -> a - b);
// Ket qua phu thuoc vao cach JVM chia chunk, khong deterministic

Quy tắc: viết reduce luôn với hàm associative. Ngay cả sequential — vì biết đâu ngày nào đó bạn thêm .parallel().

⚠️ reduce không phải dao vạn năng

Nếu task đơn giản (sum, count, max, join), dùng API đặc biệt (mapToInt.sum(), count(), max(Comparator), Collectors.joining) — đọc rõ hơn, ít bug. Chỉ dùng reduce thô khi không có API nào phù hợp.

5. Collectors — terminal op mạnh nhất

collect(Collector) biến stream thành collection hoặc aggregate structure. java.util.stream.Collectors có hàng chục factory method.

toList, toSet, toUnmodifiableList

List<Integer> list = stream.collect(Collectors.toList());          // ArrayList mutable
List<Integer> immutable = stream.collect(Collectors.toUnmodifiableList());
Set<Integer> set = stream.collect(Collectors.toSet());             // HashSet
List<Integer> list2 = stream.toList();                              // Java 16+ shortcut

Khác biệt .toList() (Java 16+) và Collectors.toList():

  • .toList() trả unmodifiable list. Add/remove → UnsupportedOperationException.
  • Collectors.toList() trả mutable ArrayList.

Rule: mặc định dùng .toList() — safer (không ai modify ngẫu nhiên). Cần mutable sau → Collectors.toList() hoặc new ArrayList<>(stream.toList()).

toMap(keyFn, valueFn) — thành Map

record User(int id, String name) { }

Map<Integer, String> byId = users.stream()
    .collect(Collectors.toMap(User::id, User::name));
// {1=Alice, 2=Bob}

keyFn lấy key từ element, valueFn lấy value.

Trùng key → crash:

List<User> dup = List.of(new User(1, "A"), new User(1, "B"));
Map<Integer, String> m = dup.stream().collect(Collectors.toMap(User::id, User::name));
// IllegalStateException: Duplicate key 1 (attempted merging values A and B)

Fix bằng merge function — quyết định trường hợp trùng key xử lý sao:

Map<Integer, String> m = dup.stream().collect(Collectors.toMap(
    User::id,
    User::name,
    (existing, newValue) -> existing + ", " + newValue   // concat khi trung
));
// {1="A, B"}

Hoặc "giữ first": (existing, newValue) -> existing. "Giữ last": (existing, newValue) -> newValue.

Pattern đếm tần suất:

Map<String, Integer> wordCount = words.stream()
    .collect(Collectors.toMap(
        w -> w,
        w -> 1,
        Integer::sum   // trung key -> cong so
    ));

groupingBy — group theo key

record Order(String customer, double amount) { }

Map<String, List<Order>> byCustomer = orders.stream()
    .collect(Collectors.groupingBy(Order::customer));
// {Alice=[order1, order3], Bob=[order2]}

Default: group thành List<Order>. Muốn khác, dùng downstream collector:

// Dem so don per customer
Map<String, Long> countByCustomer = orders.stream()
    .collect(Collectors.groupingBy(
        Order::customer,
        Collectors.counting()
    ));
// {Alice=2, Bob=1}

// Tong amount per customer
Map<String, Double> totalByCustomer = orders.stream()
    .collect(Collectors.groupingBy(
        Order::customer,
        Collectors.summingDouble(Order::amount)
    ));

Downstream collector phổ biến:

  • counting() — long, đếm.
  • summingInt/Long/Double(fn) — tổng.
  • averagingInt/Long/Double(fn) — trung bình.
  • mapping(fn, downstream) — transform trước khi aggregate.
  • joining() / joining(delim) — concat string.
  • groupingBy(keyFn, downstream) — group lồng (2 tầng).

groupingBy lồng — aggregate 2 tầng

record Sale(String region, String product, double amount) { }

Map<String, Map<String, Double>> revenueByRegionProduct = sales.stream()
    .collect(Collectors.groupingBy(
        Sale::region,
        Collectors.groupingBy(
            Sale::product,
            Collectors.summingDouble(Sale::amount)
        )
    ));
// {
//   "VN": {"Pen": 500.0, "Book": 1200.0},
//   "US": {"Pen": 300.0, "Laptop": 5000.0}
// }

Pattern này thay thế for lồng 2 tầng trong imperative. Đọc từ trên xuống: "group theo region, trong mỗi region group theo product, trong mỗi product tính tổng amount". Intent rõ, đúng hierarchy của report.

partitioningBy — chia đôi theo predicate

Map<Boolean, List<Integer>> parts = Stream.of(1, 2, 3, 4, 5)
    .collect(Collectors.partitioningBy(n -> n % 2 == 0));
// {false=[1, 3, 5], true=[2, 4]}

Khác groupingBy(n -> n % 2 == 0) ở chỗ partitioningBy đảm bảo luôn có cả 2 key truefalse (rỗng nếu không element match). groupingBy chỉ có key với element match.

Dùng khi muốn "chia 2 nhóm theo 1 điều kiện boolean". Nhanh hơn groupingBy với predicate vì collector optimized cho 2 bucket.

joining — concat string

String csv = Stream.of("a", "b", "c").collect(Collectors.joining(","));
// "a,b,c"

String bracketed = Stream.of("a", "b", "c").collect(Collectors.joining(", ", "[", "]"));
// "[a, b, c]"

3-arg: delimiter, prefix, suffix. Dùng cho build CSV, SQL IN clause, log format.

6. Pattern thực tế — sales report

Task: cho List<Sale> (region, product, amount), sinh report:

record Sale(String region, String product, double amount) { }

List<Sale> sales = List.of(
    new Sale("VN", "Pen", 100),
    new Sale("VN", "Pen", 150),
    new Sale("VN", "Book", 300),
    new Sale("US", "Pen", 200),
    new Sale("US", "Laptop", 5000)
);

// 1. Tong amount toan bo
double total = sales.stream()
    .mapToDouble(Sale::amount)
    .sum();

// 2. Tong per region
Map<String, Double> byRegion = sales.stream()
    .collect(Collectors.groupingBy(
        Sale::region,
        Collectors.summingDouble(Sale::amount)));
// {VN=550.0, US=5200.0}

// 3. Count per region + product
Map<String, Map<String, Long>> countRegionProduct = sales.stream()
    .collect(Collectors.groupingBy(
        Sale::region,
        Collectors.groupingBy(Sale::product, Collectors.counting())));
// {VN={Pen=2, Book=1}, US={Pen=1, Laptop=1}}

// 4. Top product by amount per region
Map<String, Optional<Sale>> topPerRegion = sales.stream()
    .collect(Collectors.groupingBy(
        Sale::region,
        Collectors.maxBy(Comparator.comparingDouble(Sale::amount))));
// {VN=Sale(region=VN, product=Book, amount=300), US=Sale(region=US, product=Laptop, amount=5000)}

Version imperative cho task 3 tốn 15+ dòng for lồng và 2 HashMap thủ công. Stream version 4 dòng — đúng business intent.

7. Pitfall tổng hợp

Nhầm 1: toMap trùng key → crash.

Map<Integer, String> m = list.stream()
    .collect(Collectors.toMap(User::age, User::name));
// IllegalStateException khi 2 user cung age

✅ Thêm merge function:

.toMap(User::age, User::name, (a, b) -> a + ", " + b);

Nhầm 2: .toList() Java 16+ khi cần mutable.

var l = stream.toList();
l.add(x);   // UnsupportedOperationException

✅ Dùng Collectors.toList() hoặc new ArrayList<>(stream.toList()).

Nhầm 3: mapToInt().boxed().mapToInt() unnecessarily.

int sum = list.stream().mapToInt(Integer::intValue).boxed().mapToInt(x -> x).sum();

✅ Bỏ .boxed().mapToInt(...) — đã là IntStream, gọi .sum() luôn.

Nhầm 4: reduce cho task có API chuyên.

int count = (int) list.stream().reduce(0, (a, b) -> a + 1, Integer::sum);

list.stream().count() trả long.

Nhầm 5: reduce với hàm không associative + parallel.

int diff = list.parallelStream().reduce(0, (a, b) -> a - b);
// Ket qua khong deterministic

✅ Kiểm tra associative trước khi parallel. Với task không associative, giữ sequential.

Nhầm 6: Filter rồi count thay vì count predicate.

// Khong co API count(predicate) - phai filter truoc
long evenCount = list.stream().filter(n -> n % 2 == 0).count();

✅ Code trên đúng. Java stream không có count(predicate) — dùng filter().count().

8. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Collector spec giải thích 4 component — hiểu để viết custom collector cho case đặc thù (vd collect thành record 2 field trong 1 pass). Đọc source Collectors.toMap trong OpenJDK để thấy pattern 4 component áp dụng: supplier tạo map, accumulator put entry, combiner merge 2 map parallel, finisher wrap unmodifiable nếu cần.

9. Tóm tắt

  • filter(Predicate) — giữ element thoả điều kiện. Predicate có default method and, or, negate để compose.
  • map(Function) — biến đổi, có thể đổi kiểu. mapToInt/Long/Double cho primitive stream tránh boxing.
  • Primitive stream có sum, average, min, max, summaryStatistics built-in — nhanh hơn với data lớn.
  • reduce 3 form: có identity (không Optional), không identity (trả Optional), có combiner (parallel). Hàm reduce phải associative — đặc biệt với parallel.
  • Ưu tiên API chuyên (sum, count, joining) hơn reduce thô — dễ đọc, ít bug.
  • collect(Collectors.xxx) — terminal op mạnh nhất. toList, toSet, toMap, groupingBy, partitioningBy, joining.
  • .toList() Java 16+ trả unmodifiable; Collectors.toList() mutable. Chọn theo nhu cầu sau.
  • toMap trùng key crash — luôn cung cấp merge function khi key có thể trùng.
  • groupingBy lồng với downstream collector cho aggregate 2-3 tầng — thay thế for lồng imperative.
  • partitioningBy chia đôi theo predicate, đảm bảo luôn có cả 2 key (true/false). Khác groupingBy(predicate).

10. Tự kiểm tra

Tự kiểm tra
Q1
Đoạn sau cho kết quả gì? Stream.of(1, 2, 3, 4).filter(n -> n > 1).mapToInt(n -> n * 2).sum()

Filter giữ 2, 3, 4 (bỏ 1). Map * 2 → 4, 6, 8. Sum → 18.

18

Đọc pipeline theo trật tự: filter → mapToInt → sum. Mỗi bước thu hẹp hoặc biến đổi stream đến khi terminal op tính tổng. Stream mapToInt chuyển sang IntStream — có sum() built-in.

Q2
Khác biệt cụ thể giữa .toList() (Java 16+) và Collectors.toList() là gì?
  • .toList(): trả List<T> unmodifiable. Add/remove/set → UnsupportedOperationException. Kiểu không cam kết cụ thể (implementation detail của JVM). Cho phép null element.
  • Collectors.toList(): trả ArrayList<T> mutable. Add/remove được. Cho phép null.

Quy tắc thực tế:

  • Result để đọc/iterate → .toList() (safer, không ai modify ngẫu nhiên).
  • Result cần add/remove sau → Collectors.toList() hoặc new ArrayList<>(stream.toList()).

Mặc định trong code mới nên dùng .toList() — bug mutation khó debug, immutable-by-default là design an toàn.

Q3
Khi nào reduce không hoạt động đúng với parallel stream?

Khi hàm reduce không associative: f(f(a, b), c) ≠ f(a, f(b, c)).

Sequential stream luôn chạy left-to-right deterministic. Parallel chia data thành chunk, reduce song song, rồi combine theo thứ tự không xác định. Nếu hàm không associative, kết quả phụ thuộc JVM chia chunk thế nào — non-deterministic.

Ví dụ trừ: (a, b) -> a - b. Sequential ((10-1)-2)-3 = 4. Parallel có thể chia [(10-1), (2-3)], combine 9 - (-1) = 10. Khác nhau.

An toàn cho parallel: cộng, nhân, max, min, union set, string concat (với StringBuilder collector). Tránh: trừ, chia, bất cứ op nào phụ thuộc thứ tự.

Q4
Làm thế nào count số user theo age, group theo department? Output kiểu gì?
Map<String, Map<Integer, Long>> result = users.stream()
  .collect(Collectors.groupingBy(
      User::department,
      Collectors.groupingBy(
          User::age,
          Collectors.counting())));

Output kiểu Map<String, Map<Integer, Long>>:

  • Outer key: department name.
  • Inner key: age.
  • Inner value: số user cùng (department, age).

Cấu trúc: tier 1 group department → tier 2 (downstream) group age → tier 3 count. Đúng cấu trúc của report "từng phòng ban, từng tuổi, có mấy người".

Imperative tương đương: 2 for lồng + nested HashMap thủ công, ~20 dòng. Stream 5 dòng.

Q5
Khi nào dùng reduce, khi nào dùng collect?
  • reduce: gộp stream về single value cùng kiểu element — sum, product, max, min, string concat. Hàm reducer phải stateless + associative.
  • collect: build mutable container — List, Set, Map, StringBuilder. Dùng Collectors.* cho common case.

Quick rule:

  • Output là số/primitive → reduce hoặc API chuyên (mapToInt.sum(), count()).
  • Output là collection/map → collect.

Technical: reduce là "functional reduce" (pure function), collect là "mutable reduction" (build container bằng mutation — an toàn vì mỗi thread có container riêng, combine cuối).

Q6
Khi nào dùng partitioningBy thay groupingBy với predicate?

Cả hai chia element thành 2 nhóm theo boolean. Khác biệt:

  • partitioningBy: key là Boolean (true/false), luôn có cả 2 key (rỗng nếu không element match). Collector optimized cho 2 bucket.
  • groupingBy(n -> n % 2 == 0): key là Boolean, nhưng chỉ có key với element match. Nếu không element nào match false → map chỉ có 1 key true.

Dùng partitioningBy khi: (1) business logic chia đôi rõ ràng (pass/fail, adult/minor); (2) downstream code expect luôn có cả true và false key, không muốn null-check.

Rule: có 2 bucket cố định → partitioningBy. Có nhiều bucket (enum, nhóm theo tên) → groupingBy.

Bài tiếp theo: Stream nâng cao — flatMap, takeWhile, parallel

Bài này có giúp bạn hiểu bản chất không?