Mini-challenge: Log aggregator với NIO.2 và Stream
Bài thực hành khép lại module I/O & NIO — walk thư mục log, stream từng file, parse line, aggregate thống kê, output report CSV. Pattern production thực tế.
Mini-challenge khép lại module I/O & NIO. Bạn sẽ viết module log aggregator: walk thư mục logs/, đọc từng file .log, parse line, aggregate số lỗi theo ngày và level, xuất báo cáo CSV.
Đây là task điển hình data/ops engineer: hàng trăm file log từ nhiều service → 1 báo cáo tổng. Yêu cầu: không OOM với file lớn, không leak file handle, code declarative với stream.
🎯 Đề bài
Setup
Thư mục logs/ chứa các file .log. Mỗi dòng format:
[2025-04-01T10:00:15Z] INFO [svc-auth] User login OK
[2025-04-01T10:00:16Z] ERROR [svc-order] Order creation failed: out of stock
[2025-04-01T10:00:17Z] WARN [svc-auth] Slow query detected 1200ms
Format regex: \\[(\\S+)\\] (\\w+)\\s+\\[(\\S+)\\] (.+). Group: timestamp, level, service, message.
1. LogLine record
record LogLine(Instant time, String level, String service, String message) { }
Method static LogLine parse(String line) — return null nếu không match.
2. LogAggregator.countByLevelPerDay(Path dir)
Walk tất cả .log trong dir (recursive). Parse line. Group theo (date, level). Đếm số line.
Signature:
Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException
Output ví dụ:
{
2025-04-01: {INFO=120, ERROR=5, WARN=3},
2025-04-02: {INFO=98, ERROR=12}
}
3. LogAggregator.errorsByService(Path dir)
Chỉ line level ERROR, group theo service, đếm.
Map<String, Long> errorsByService(Path dir) throws IOException
4. LogAggregator.writeReport(Path dir, Path outputCsv)
Gọi 2 method trên, ghi file CSV format:
date,level,count
2025-04-01,ERROR,5
2025-04-01,INFO,120
...
Và append thêm:
service,error_count
svc-auth,8
svc-order,15
Dùng Files.write với StandardOpenOption.
Yêu cầu non-functional:
- Không OOM với file log 10GB — pipeline phải lazy end-to-end với
Files.lines, không materialize toàn bộ line vào memory. - Không leak file handle — try-with-resources cho
walk;linesđược close đúng qua contract củaflatMap(xem Gợi ý). - Skip file không parse được (hoặc line malformed) — không crash.
📦 Concept dùng trong bài
| Concept | Bài | Dùng ở đây |
|---|---|---|
| Path, Files | Bài 02 — Path và Files | Tạo path, ghi report CSV |
| Files.walk | Bài 03 — Duyệt thư mục | Duyệt thư mục log recursive |
| Files.lines | Bài 06 — Stream file | Stream từng line, không load toàn file |
| try-with-resources | Try-with-resources | Close stream đúng |
| Stream map/filter/collect | Map, filter, reduce | Parse + aggregate |
| Collectors.groupingBy lồng | Collectors deep | Group 2 tầng (date → level) |
| Record | Record | LogLine immutable |
| Regex | bonus | Parse line |
▶️ Starter code
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.regex.*;
import java.util.stream.*;
public class LogAggregatorApp {
public record LogLine(Instant time, String level, String service, String message) {
static final Pattern PATTERN = Pattern.compile(
"\\[(\\S+)\\] (\\w+)\\s+\\[(\\S+)\\] (.+)");
public static LogLine parse(String line) {
Matcher m = PATTERN.matcher(line);
if (!m.matches()) return null;
try {
return new LogLine(
Instant.parse(m.group(1)),
m.group(2),
m.group(3),
m.group(4));
} catch (DateTimeException e) {
return null;
}
}
public LocalDate date() {
return LocalDate.ofInstant(time, ZoneOffset.UTC);
}
}
public static class LogAggregator {
// Build lazy pipeline of parsed LogLine from a stream of paths.
// Note: Stream.flatMap closes each inner stream after it is consumed,
// so flatMap(p -> Files.lines(p)) does NOT leak file handles.
// Caller owns closing the outer Files.walk stream.
private Stream<LogLine> logLines(Stream<Path> files) {
// TODO:
// 1. filter isRegularFile + name endsWith .log
// 2. flatMap -> Files.lines(file) (wrap IOException -> Stream.empty())
// 3. map LogLine::parse
// 4. filter non-null
return Stream.empty();
}
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
// TODO: try (files = Files.walk(dir)) { consume logLines(files), groupingBy 2 tang }
return Map.of();
}
public Map<String, Long> errorsByService(Path dir) throws IOException {
// TODO: filter ERROR, groupingBy service, counting
return Map.of();
}
public void writeReport(Path dir, Path outputCsv) throws IOException {
// TODO: build CSV string, Files.writeString(outputCsv, csv, UTF_8)
}
}
// Helper: tao data fake de test
public static void seedLogs(Path dir) throws IOException {
Files.createDirectories(dir);
String[] samples = {
"[2025-04-01T10:00:15Z] INFO [svc-auth] User login OK",
"[2025-04-01T10:00:16Z] ERROR [svc-order] Order creation failed: out of stock",
"[2025-04-01T10:00:17Z] WARN [svc-auth] Slow query detected 1200ms",
"[2025-04-01T10:01:18Z] ERROR [svc-auth] Login rate limit",
"[2025-04-02T09:00:00Z] INFO [svc-order] Daily summary",
"[2025-04-02T09:30:00Z] ERROR [svc-order] DB timeout",
"malformed line ignore me",
"[2025-04-02T10:00:00Z] INFO [svc-auth] Token refresh"
};
Files.write(dir.resolve("service-a.log"), List.of(samples), StandardCharsets.UTF_8);
Files.createDirectories(dir.resolve("service-b"));
Files.write(dir.resolve("service-b/2025-04-02.log"), List.of(
"[2025-04-02T11:00:00Z] ERROR [svc-payment] Card declined",
"[2025-04-02T11:05:00Z] INFO [svc-payment] Retry success",
"[2025-04-02T11:10:00Z] ERROR [svc-payment] Fraud detected"
), StandardCharsets.UTF_8);
}
public static void main(String[] args) throws IOException {
Path dir = Path.of("tmp-logs");
seedLogs(dir);
LogAggregator agg = new LogAggregator();
Map<LocalDate, Map<String, Long>> byDay = agg.countByLevelPerDay(dir);
Map<String, Long> byService = agg.errorsByService(dir);
System.out.println("By day/level:");
byDay.forEach((d, m) -> System.out.printf(" %s -> %s%n", d, m));
System.out.println("Errors by service:");
byService.forEach((s, c) -> System.out.printf(" %s: %d%n", s, c));
Path report = dir.resolve("report.csv");
agg.writeReport(dir, report);
System.out.println("\nReport written to: " + report.toAbsolutePath());
}
}
javac LogAggregatorApp.java
java LogAggregatorApp
Dành 25–30 phút.
💡 Gợi ý
Điểm mấu chốt — 2 loại stream cần close: Files.walk (stream của path) và Files.lines (stream của line) đều giữ file handle.
Tin tốt cho Files.lines: javadoc của Stream.flatMap đảm bảo "each mapped stream is closed after its contents have been placed into this stream" — mỗi inner stream được close ngay sau khi tiêu thụ xong. Nghĩa là pattern flatMap(p -> Files.lines(p)) vừa lazy vừa close đúng từng file — tại mỗi thời điểm chỉ 1 file đang mở. Bạn không cần làm gì thêm cho inner stream.
Việc còn lại: close stream ngoài (Files.walk) bằng try-with-resources, và tiêu thụ pipeline ngay bên trong block try (collect xong rồi mới thoát block — thoát sớm thì walk đã đóng mà pipeline chưa chạy):
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
try (Stream<Path> files = Files.walk(dir)) {
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(this::tryLines) // Lazy + tu close tung file
.map(LogLine::parse)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(
LogLine::date,
Collectors.groupingBy(
LogLine::level,
Collectors.counting())));
}
}
private Stream<String> tryLines(Path file) {
try {
return Files.lines(file, StandardCharsets.UTF_8);
} catch (IOException e) {
return Stream.empty(); // File loi -> skip, khong keo do pipeline
}
}
Đừng materialize kiểu .toList().stream() hay đọc cả file vào List để "dễ close" — làm vậy phản bội yêu cầu "không OOM với log 10GB". Pipeline trên lazy end-to-end: heap chỉ giữ 1 dòng đang xử lý + buffer 8KB của file đang đọc + map kết quả aggregate.
errorsByService: tương tự, thêm filter level bằng "ERROR".
writeReport:
public void writeReport(Path dir, Path outputCsv) throws IOException {
Map<LocalDate, Map<String, Long>> byDay = countByLevelPerDay(dir);
Map<String, Long> byService = errorsByService(dir);
StringBuilder csv = new StringBuilder();
csv.append("date,level,count\n");
byDay.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(e -> e.getValue().forEach((level, count) ->
csv.append(e.getKey()).append(",").append(level).append(",").append(count).append("\n")));
csv.append("\nservice,error_count\n");
byService.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEach(e -> csv.append(e.getKey()).append(",").append(e.getValue()).append("\n"));
Files.writeString(outputCsv, csv.toString(), StandardCharsets.UTF_8);
}
✅ Lời giải
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.regex.*;
import java.util.stream.*;
public class LogAggregatorApp {
public record LogLine(Instant time, String level, String service, String message) {
static final Pattern PATTERN = Pattern.compile(
"\\[(\\S+)\\] (\\w+)\\s+\\[(\\S+)\\] (.+)");
public static LogLine parse(String line) {
Matcher m = PATTERN.matcher(line);
if (!m.matches()) return null;
try {
return new LogLine(
Instant.parse(m.group(1)),
m.group(2),
m.group(3),
m.group(4));
} catch (DateTimeException e) {
return null;
}
}
public LocalDate date() {
return LocalDate.ofInstant(time, ZoneOffset.UTC);
}
}
public static class LogAggregator {
// Lazy pipeline: walk -> .log files -> lines -> parsed LogLine.
// Stream.flatMap closes each inner Files.lines stream after its
// contents are consumed - only 1 file is open at a time.
// Caller owns the outer files stream (try-with-resources).
private Stream<LogLine> logLines(Stream<Path> files) {
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(this::tryLines)
.map(LogLine::parse)
.filter(Objects::nonNull);
}
private Stream<String> tryLines(Path file) {
try {
return Files.lines(file, StandardCharsets.UTF_8);
} catch (IOException e) {
System.err.println("Skipped " + file + ": " + e.getMessage());
return Stream.empty();
}
}
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
try (Stream<Path> files = Files.walk(dir)) {
return logLines(files)
.collect(Collectors.groupingBy(
LogLine::date,
TreeMap::new,
Collectors.groupingBy(
LogLine::level,
Collectors.counting())));
}
}
public Map<String, Long> errorsByService(Path dir) throws IOException {
try (Stream<Path> files = Files.walk(dir)) {
return logLines(files)
.filter(l -> "ERROR".equals(l.level()))
.collect(Collectors.groupingBy(
LogLine::service,
Collectors.counting()));
}
}
public void writeReport(Path dir, Path outputCsv) throws IOException {
Map<LocalDate, Map<String, Long>> byDay = countByLevelPerDay(dir);
Map<String, Long> byService = errorsByService(dir);
StringBuilder csv = new StringBuilder();
csv.append("date,level,count\n");
byDay.forEach((date, levelMap) ->
levelMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(e -> csv.append(date).append(",")
.append(e.getKey()).append(",")
.append(e.getValue()).append("\n")));
csv.append("\nservice,error_count\n");
byService.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEach(e -> csv.append(e.getKey()).append(",")
.append(e.getValue()).append("\n"));
Files.writeString(outputCsv, csv.toString(), StandardCharsets.UTF_8);
}
}
public static void seedLogs(Path dir) throws IOException {
Files.createDirectories(dir);
String[] samples = {
"[2025-04-01T10:00:15Z] INFO [svc-auth] User login OK",
"[2025-04-01T10:00:16Z] ERROR [svc-order] Order creation failed: out of stock",
"[2025-04-01T10:00:17Z] WARN [svc-auth] Slow query detected 1200ms",
"[2025-04-01T10:01:18Z] ERROR [svc-auth] Login rate limit",
"[2025-04-02T09:00:00Z] INFO [svc-order] Daily summary",
"[2025-04-02T09:30:00Z] ERROR [svc-order] DB timeout",
"malformed line ignore me",
"[2025-04-02T10:00:00Z] INFO [svc-auth] Token refresh"
};
Files.write(dir.resolve("service-a.log"), List.of(samples), StandardCharsets.UTF_8);
Files.createDirectories(dir.resolve("service-b"));
Files.write(dir.resolve("service-b/2025-04-02.log"), List.of(
"[2025-04-02T11:00:00Z] ERROR [svc-payment] Card declined",
"[2025-04-02T11:05:00Z] INFO [svc-payment] Retry success",
"[2025-04-02T11:10:00Z] ERROR [svc-payment] Fraud detected"
), StandardCharsets.UTF_8);
}
public static void main(String[] args) throws IOException {
Path dir = Path.of("tmp-logs");
seedLogs(dir);
LogAggregator agg = new LogAggregator();
Map<LocalDate, Map<String, Long>> byDay = agg.countByLevelPerDay(dir);
Map<String, Long> byService = agg.errorsByService(dir);
System.out.println("By day/level:");
byDay.forEach((d, m) -> System.out.printf(" %s -> %s%n", d, m));
System.out.println("\nErrors by service:");
byService.forEach((s, c) -> System.out.printf(" %-15s %d%n", s, c));
Path report = dir.resolve("report.csv");
agg.writeReport(dir, report);
System.out.println("\nReport written to: " + report.toAbsolutePath());
}
}
Điểm chính:
-
Lazy end-to-end, không OOM: pipeline
walk → flatMap(lines) → parse → collectđọc từng dòng khi collector cần. Heap tại mỗi thời điểm chỉ giữ: 1 dòng đang xử lý + buffer 8KB của file đang mở + map kết quả aggregate. Log 10GB hay 100GB đều chạy được — đúng yêu cầu non-functional của đề. -
flatMaptự close inner stream: javadocStream.flatMapghi rõ "each mapped stream is closed after its contents have been placed into this stream". Vì vậyflatMap(p -> Files.lines(p))không leak file handle — tại mỗi thời điểm chỉ 1 file đang mở, đọc xong là close. (Lưu ý lịch sử: JDK trước Java 10 có bug flatMap không close inner stream khi pipeline short-circuit — JDK-8075939; vớicollecttiêu thụ toàn bộ thì không bị ảnh hưởng.) -
Outer stream do caller close:
Files.walkgiữ directory handle → mỗi aggregate method tự mở walk trong try-with-resources và collect ngay trong block — pipeline tiêu thụ xong mới thoát block, walk close sau cùng. -
TreeMapcho sorted output:groupingBy(keyFn, TreeMap::new, downstream)— factory cho map. Thay vìHashMaprandom order,TreeMapsort theoLocalDatetự nhiên. -
Skip malformed line:
LogLine.parsereturn null khi regex không match, streamfilter(Objects::nonNull)loại. Không crash. -
Skip file lỗi:
tryLinescatchIOException, log warning, trảStream.empty()— file lỗi bỏ qua, không kéo đổ cả aggregate. -
CSV build với StringBuilder: đơn giản, không cần thư viện. Production dùng Apache Commons CSV cho escape đúng khi field chứa
,/". -
Sort output: date ascending, level ascending, service by count desc — output có ý nghĩa khi view file.
🎓 Mở rộng
Mức 1 — Parallel stream với file lớn:
.parallel()
.collect(Collectors.groupingByConcurrent(...));
groupingByConcurrent thay groupingBy — dùng ConcurrentHashMap. Với nhiều file + file lớn, parallel scan nhanh hơn. Benchmark trước khi commit.
Mức 2 — Trả stream lazy ra ngoài method với onClose:
Lời giải trên tiêu thụ pipeline ngay trong method. Khi API của bạn cần trả stream ra ngoài cho caller tự xử lý, không thể try-with-resources bên trong — gắn việc close walk vào chính stream trả về:
private Stream<LogLine> streamLazy(Path dir) throws IOException {
Stream<Path> files = Files.walk(dir);
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(this::tryLines)
.map(LogLine::parse)
.filter(Objects::nonNull)
.onClose(files::close);
}
// Consumer phai try-with-resources:
try (Stream<LogLine> s = agg.streamLazy(dir)) {
Map<...> result = s.collect(...);
}
Vẫn lazy end-to-end như lời giải chính (inner stream vẫn do flatMap close) — chỉ chuyển trách nhiệm close outer stream sang consumer qua onClose.
Mức 3 — Watch service auto-update:
Watch logs/ với WatchService (bài 03); khi có file mới, parse và update aggregate. Live dashboard.
Mức 4 — Output multiple format:
Thêm writeReport(dir, path, Format.JSON | Format.CSV | Format.MARKDOWN) — strategy pattern qua interface + composition cho writer.
interface ReportWriter {
void write(Report report, Path output) throws IOException;
}
class CsvWriter implements ReportWriter { ... }
class JsonWriter implements ReportWriter { ... }
✨ Điều bạn vừa làm được
Hoàn thành mini-challenge này, bạn đã:
- Walk thư mục với
Files.walkrecursive — xử lý nhiều file trong subdir. - Stream từng line với
Files.lines— lazy end-to-end, không OOM với file log lớn. - Hiểu contract
flatMapclose inner stream — ghép walk + lines không leak file handle. - Parse robust với regex — skip malformed line, không crash pipeline.
- Aggregate 2 tầng với
Collectors.groupingBylồng +TreeMapcho sorted output. - Close stream đúng với try-with-resources cho outer walk stream.
- Handle IOException per-file — file lỗi không kéo đổ cả aggregate.
- Ghi CSV output với
Files.writeString— API hiện đại. - Áp pattern log processing production — foundation cho tool monitoring/analytics.
Chúc mừng — bạn đã hoàn thành module I/O & NIO! Bạn giờ có toolkit đầy đủ cho file I/O: stream cổ điển, NIO.2 Path/Files, duyệt thư mục + WatchService, serialization và mặt tối của nó, memory-mapped, và pattern stream + collector để aggregate file lớn. Module tiếp theo — JVM Internals: Class loader — JVM tìm và nạp class thế nào — hiểu máy bên dưới để debug khi production đi sai.
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên