Mini-challenge: Log aggregator với NIO.2 và Stream
Bài thực hành khép lại Module 11 — walk thư mục log, stream từng file, parse line, aggregate thống kê, output report CSV. Pattern production thực tế.
Mini-challenge khép lại Module 11. Bạn sẽ viết module log aggregator: walk thư mục logs/, đọc từng file .log, parse line, aggregate số lỗi theo ngày và level, xuất báo cáo CSV.
Đây là task điển hình data/ops engineer: hàng trăm file log từ nhiều service → 1 báo cáo tổng. Yêu cầu: không OOM với file lớn, không leak file handle, code declarative với stream.
🎯 Đề bài
Setup
Thư mục logs/ chứa các file .log. Mỗi dòng format:
[2025-04-01T10:00:15Z] INFO [svc-auth] User login OK
[2025-04-01T10:00:16Z] ERROR [svc-order] Order creation failed: out of stock
[2025-04-01T10:00:17Z] WARN [svc-auth] Slow query detected 1200ms
Format regex: \\[(\\S+)\\] (\\w+)\\s+\\[(\\S+)\\] (.+). Group: timestamp, level, service, message.
1. LogLine record
record LogLine(Instant time, String level, String service, String message) { }
Method static LogLine parse(String line) — return null nếu không match.
2. LogAggregator.countByLevelPerDay(Path dir)
Walk tất cả .log trong dir (recursive). Parse line. Group theo (date, level). Đếm số line.
Signature:
Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException
Output ví dụ:
{
2025-04-01: {INFO=120, ERROR=5, WARN=3},
2025-04-02: {INFO=98, ERROR=12}
}
3. LogAggregator.errorsByService(Path dir)
Chỉ line level ERROR, group theo service, đếm.
Map<String, Long> errorsByService(Path dir) throws IOException
4. LogAggregator.writeReport(Path dir, Path outputCsv)
Gọi 2 method trên, ghi file CSV format:
date,level,count
2025-04-01,ERROR,5
2025-04-01,INFO,120
...
Và append thêm:
service,error_count
svc-auth,8
svc-order,15
Dùng Files.write với StandardOpenOption.
Yêu cầu non-functional:
- Không OOM với file log 10GB — dùng
Files.lines. - Không leak file handle — try-with-resources cho cả
walkvàlines. - Skip file không parse được (hoặc line malformed) — không crash.
📦 Concept dùng trong bài
| Concept | Bài | Dùng ở đây |
|---|---|---|
| Path, Files.walk | 11.2 | Duyệt thư mục log recursive |
| Files.lines | 11.4 | Stream từng line, không load toàn file |
| try-with-resources | 11.1 | Close stream đúng |
| Stream map/filter/collect | 9.3 | Parse + aggregate |
| Collectors.groupingBy lồng | 9.3 | Group 2 tầng (date → level) |
| Record | 5.6 | LogLine immutable |
| Regex | bonus | Parse line |
| Files.writeString | 11.2 | Ghi report CSV |
▶️ Starter code
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.regex.*;
import java.util.stream.*;
public class LogAggregatorApp {
public record LogLine(Instant time, String level, String service, String message) {
static final Pattern PATTERN = Pattern.compile(
"\\[(\\S+)\\] (\\w+)\\s+\\[(\\S+)\\] (.+)");
public static LogLine parse(String line) {
Matcher m = PATTERN.matcher(line);
if (!m.matches()) return null;
try {
return new LogLine(
Instant.parse(m.group(1)),
m.group(2),
m.group(3),
m.group(4));
} catch (DateTimeException e) {
return null;
}
}
public LocalDate date() {
return LocalDate.ofInstant(time, ZoneOffset.UTC);
}
}
public static class LogAggregator {
// Stream all log lines across all .log files under dir
private Stream<LogLine> streamAllLines(Path dir) throws IOException {
// TODO:
// 1. Files.walk(dir)
// 2. filter isRegularFile + name endsWith .log
// 3. flatMap -> Files.lines(file)
// 4. map LogLine::parse
// 5. filter non-null
// Note: streams must be closed - consumer responsibility
return Stream.empty();
}
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
// TODO: consume streamAllLines va groupingBy 2 tang
return Map.of();
}
public Map<String, Long> errorsByService(Path dir) throws IOException {
// TODO: filter ERROR, groupingBy service, counting
return Map.of();
}
public void writeReport(Path dir, Path outputCsv) throws IOException {
// TODO: build CSV string, Files.writeString(outputCsv, csv, UTF_8)
}
}
// Helper: tao data fake de test
public static void seedLogs(Path dir) throws IOException {
Files.createDirectories(dir);
String[] samples = {
"[2025-04-01T10:00:15Z] INFO [svc-auth] User login OK",
"[2025-04-01T10:00:16Z] ERROR [svc-order] Order creation failed: out of stock",
"[2025-04-01T10:00:17Z] WARN [svc-auth] Slow query detected 1200ms",
"[2025-04-01T10:01:18Z] ERROR [svc-auth] Login rate limit",
"[2025-04-02T09:00:00Z] INFO [svc-order] Daily summary",
"[2025-04-02T09:30:00Z] ERROR [svc-order] DB timeout",
"malformed line ignore me",
"[2025-04-02T10:00:00Z] INFO [svc-auth] Token refresh"
};
Files.write(dir.resolve("service-a.log"), List.of(samples), StandardCharsets.UTF_8);
Files.createDirectories(dir.resolve("service-b"));
Files.write(dir.resolve("service-b/2025-04-02.log"), List.of(
"[2025-04-02T11:00:00Z] ERROR [svc-payment] Card declined",
"[2025-04-02T11:05:00Z] INFO [svc-payment] Retry success",
"[2025-04-02T11:10:00Z] ERROR [svc-payment] Fraud detected"
), StandardCharsets.UTF_8);
}
public static void main(String[] args) throws IOException {
Path dir = Path.of("tmp-logs");
seedLogs(dir);
LogAggregator agg = new LogAggregator();
Map<LocalDate, Map<String, Long>> byDay = agg.countByLevelPerDay(dir);
Map<String, Long> byService = agg.errorsByService(dir);
System.out.println("By day/level:");
byDay.forEach((d, m) -> System.out.printf(" %s -> %s%n", d, m));
System.out.println("Errors by service:");
byService.forEach((s, c) -> System.out.printf(" %s: %d%n", s, c));
Path report = dir.resolve("report.csv");
agg.writeReport(dir, report);
System.out.println("\nReport written to: " + report.toAbsolutePath());
}
}
javac LogAggregatorApp.java
java LogAggregatorApp
Dành 25–30 phút.
💡 Gợi ý
Vấn đề lớn: Files.lines trả stream giữ file handle — phải close. Nhưng nếu flatMap các stream của nhiều file, outer consumer không thấy từng inner stream → khó close.
Giải pháp: mỗi method tự walk + process, không share stream giữa method. Hoặc consume ngay trong method:
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
try (Stream<Path> files = Files.walk(dir)) {
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(this::tryLines)
.map(LogLine::parse)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(
LogLine::date,
Collectors.groupingBy(
LogLine::level,
Collectors.counting())));
}
}
private Stream<String> tryLines(Path file) {
try {
return Files.lines(file, StandardCharsets.UTF_8);
} catch (IOException e) {
return Stream.empty();
}
}
Vấn đề con: Files.lines trong tryLines không close. Với file nhỏ/ít, GC close sau một lúc — OK cho challenge này. Production dùng pattern onClose:
private Stream<String> tryLines(Path file) {
try {
Stream<String> s = Files.lines(file, StandardCharsets.UTF_8);
return s; // Van khong close dung cho ngoi
} catch (IOException e) {
return Stream.empty();
}
}
Để robust, đọc manual loop:
private List<LogLine> readFile(Path file) {
try (Stream<String> lines = Files.lines(file, StandardCharsets.UTF_8)) {
return lines.map(LogLine::parse).filter(Objects::nonNull).toList();
} catch (IOException e) {
return List.of();
}
}
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
try (Stream<Path> files = Files.walk(dir)) {
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(p -> readFile(p).stream())
.collect(Collectors.groupingBy(
LogLine::date,
Collectors.groupingBy(LogLine::level, Collectors.counting())));
}
}
Trade-off: readFile load cả file vào List — không stream lazy. OK với file vừa, không OK với file 10GB. Production dùng thư viện như Commons IO LineIterator có proper close, hoặc refactor code để consumer chain close.
errorsByService: tương tự, filter level = "ERROR".
writeReport:
public void writeReport(Path dir, Path outputCsv) throws IOException {
Map<LocalDate, Map<String, Long>> byDay = countByLevelPerDay(dir);
Map<String, Long> byService = errorsByService(dir);
StringBuilder csv = new StringBuilder();
csv.append("date,level,count\n");
byDay.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(e -> e.getValue().forEach((level, count) ->
csv.append(e.getKey()).append(",").append(level).append(",").append(count).append("\n")));
csv.append("\nservice,error_count\n");
byService.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEach(e -> csv.append(e.getKey()).append(",").append(e.getValue()).append("\n"));
Files.writeString(outputCsv, csv.toString(), StandardCharsets.UTF_8);
}
✅ Lời giải
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.regex.*;
import java.util.stream.*;
public class LogAggregatorApp {
public record LogLine(Instant time, String level, String service, String message) {
static final Pattern PATTERN = Pattern.compile(
"\\[(\\S+)\\] (\\w+)\\s+\\[(\\S+)\\] (.+)");
public static LogLine parse(String line) {
Matcher m = PATTERN.matcher(line);
if (!m.matches()) return null;
try {
return new LogLine(
Instant.parse(m.group(1)),
m.group(2),
m.group(3),
m.group(4));
} catch (DateTimeException e) {
return null;
}
}
public LocalDate date() {
return LocalDate.ofInstant(time, ZoneOffset.UTC);
}
}
public static class LogAggregator {
private List<LogLine> readFile(Path file) {
try (Stream<String> lines = Files.lines(file, StandardCharsets.UTF_8)) {
return lines.map(LogLine::parse).filter(Objects::nonNull).toList();
} catch (IOException e) {
System.err.println("Skipped " + file + ": " + e.getMessage());
return List.of();
}
}
private Stream<LogLine> streamAll(Path dir) throws IOException {
try (Stream<Path> files = Files.walk(dir)) {
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(p -> readFile(p).stream())
.toList()
.stream(); // Materialize so outer try closes safe
}
}
public Map<LocalDate, Map<String, Long>> countByLevelPerDay(Path dir) throws IOException {
return streamAll(dir)
.collect(Collectors.groupingBy(
LogLine::date,
() -> new TreeMap<>(),
Collectors.groupingBy(
LogLine::level,
Collectors.counting())));
}
public Map<String, Long> errorsByService(Path dir) throws IOException {
return streamAll(dir)
.filter(l -> "ERROR".equals(l.level()))
.collect(Collectors.groupingBy(
LogLine::service,
Collectors.counting()));
}
public void writeReport(Path dir, Path outputCsv) throws IOException {
Map<LocalDate, Map<String, Long>> byDay = countByLevelPerDay(dir);
Map<String, Long> byService = errorsByService(dir);
StringBuilder csv = new StringBuilder();
csv.append("date,level,count\n");
byDay.forEach((date, levelMap) ->
levelMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(e -> csv.append(date).append(",")
.append(e.getKey()).append(",")
.append(e.getValue()).append("\n")));
csv.append("\nservice,error_count\n");
byService.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEach(e -> csv.append(e.getKey()).append(",")
.append(e.getValue()).append("\n"));
Files.writeString(outputCsv, csv.toString(), StandardCharsets.UTF_8);
}
}
public static void seedLogs(Path dir) throws IOException {
Files.createDirectories(dir);
String[] samples = {
"[2025-04-01T10:00:15Z] INFO [svc-auth] User login OK",
"[2025-04-01T10:00:16Z] ERROR [svc-order] Order creation failed: out of stock",
"[2025-04-01T10:00:17Z] WARN [svc-auth] Slow query detected 1200ms",
"[2025-04-01T10:01:18Z] ERROR [svc-auth] Login rate limit",
"[2025-04-02T09:00:00Z] INFO [svc-order] Daily summary",
"[2025-04-02T09:30:00Z] ERROR [svc-order] DB timeout",
"malformed line ignore me",
"[2025-04-02T10:00:00Z] INFO [svc-auth] Token refresh"
};
Files.write(dir.resolve("service-a.log"), List.of(samples), StandardCharsets.UTF_8);
Files.createDirectories(dir.resolve("service-b"));
Files.write(dir.resolve("service-b/2025-04-02.log"), List.of(
"[2025-04-02T11:00:00Z] ERROR [svc-payment] Card declined",
"[2025-04-02T11:05:00Z] INFO [svc-payment] Retry success",
"[2025-04-02T11:10:00Z] ERROR [svc-payment] Fraud detected"
), StandardCharsets.UTF_8);
}
public static void main(String[] args) throws IOException {
Path dir = Path.of("tmp-logs");
seedLogs(dir);
LogAggregator agg = new LogAggregator();
Map<LocalDate, Map<String, Long>> byDay = agg.countByLevelPerDay(dir);
Map<String, Long> byService = agg.errorsByService(dir);
System.out.println("By day/level:");
byDay.forEach((d, m) -> System.out.printf(" %s -> %s%n", d, m));
System.out.println("\nErrors by service:");
byService.forEach((s, c) -> System.out.printf(" %-15s %d%n", s, c));
Path report = dir.resolve("report.csv");
agg.writeReport(dir, report);
System.out.println("\nReport written to: " + report.toAbsolutePath());
}
}
Điểm chính:
-
try-with-resourceslồng:Files.walklà stream của path,Files.lineslà stream của line. Cả hai phải close. Giải pháp:readFileencapsulateFiles.linestrong try riêng; outerFiles.walktrong try khác. -
materializevới.toList().stream(): pattern để consume lazy stream trongtry (files)block. Trade-off: load tất cả LogLine vào memory. Với log 10GB record ít (chỉ giữ LogLine object không giữ raw text), vẫn OK. Với 100M record thì consider iterator pattern. -
TreeMapcho sorted output:groupingBy(keyFn, () -> new TreeMap<>(), downstream)— factory cho map. Thay vìHashMaprandom order,TreeMapsort theoLocalDatetự nhiên. -
Skip malformed line:
LogLine.parsereturn null khi regex không match, streamfilter(Objects::nonNull)loại. Không crash. -
Skip file lỗi:
readFilecatchIOException, log warning, trảList.of()— file lỗi bỏ qua, không kéo đổ cả aggregate. -
CSV build với StringBuilder: đơn giản, không cần thư viện. Production dùng Apache Commons CSV cho escape đúng khi field chứa
,/". -
Sort output: date ascending, level ascending, service by count desc — output có ý nghĩa khi view file.
🎓 Mở rộng
Mức 1 — Parallel stream với file lớn:
.parallelStream()
.collect(Collectors.groupingByConcurrent(...));
groupingByConcurrent thay groupingBy — dùng ConcurrentHashMap. Với nhiều file + file lớn, parallel scan nhanh hơn. Benchmark trước khi commit.
Mức 2 — Stream lazy đúng với onClose:
private Stream<LogLine> streamLazy(Path dir) throws IOException {
Stream<Path> files = Files.walk(dir);
return files
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".log"))
.flatMap(p -> {
try {
Stream<String> lines = Files.lines(p, StandardCharsets.UTF_8);
return lines.map(LogLine::parse).filter(Objects::nonNull);
} catch (IOException e) {
return Stream.empty();
}
})
.onClose(files::close);
}
// Consumer phai try-with-resources:
try (Stream<LogLine> s = agg.streamLazy(dir)) {
Map<...> result = s.collect(...);
}
Lazy thật — mỗi element đọc từ file khi pipeline cần. Consumer chịu trách nhiệm close. Scale với file lớn.
Mức 3 — Watch service auto-update:
Watch logs/ với WatchService; khi có file mới, parse và update aggregate. Live dashboard.
Mức 4 — Output multiple format:
Thêm writeReport(dir, path, Format.JSON | Format.CSV | Format.MARKDOWN) — strategy pattern (bài 6.6 composition) cho writer.
interface ReportWriter {
void write(Report report, Path output) throws IOException;
}
class CsvWriter implements ReportWriter { ... }
class JsonWriter implements ReportWriter { ... }
✨ Điều bạn vừa làm được
Hoàn thành mini-challenge này, bạn đã:
- Walk thư mục với
Files.walkrecursive — xử lý nhiều file trong subdir. - Stream từng line với
Files.lines— không OOM với file log lớn. - Parse robust với regex — skip malformed line, không crash pipeline.
- Aggregate 2 tầng với
Collectors.groupingBylồng +TreeMapcho sorted output. - Close stream đúng với try-with-resources — không leak file handle.
- Handle IOException per-file — file lỗi không kéo đổ cả aggregate.
- Ghi CSV output với
Files.writeString— API hiện đại. - Áp pattern log processing production — foundation cho tool monitoring/analytics.
Chúc mừng — bạn đã hoàn thành Module 11! Bạn giờ có toolkit đầy đủ cho file I/O: stream cổ điển, NIO.2 Path/Files, memory-mapped, và pattern stream + collector để aggregate file lớn. Khoá Java đã đi qua 11 module — cú pháp, OOP, concurrency, và I/O. Module tiếp theo trong roadmap: JVM internals (class loading, bytecode, GC) — hiểu máy bên dưới để debug khi production đi sai.
Bài này có giúp bạn hiểu bản chất không?