Stream file — Files.lines, memory-mapped, channel
Xử lý file lớn stream-based với Files.lines, try-with-resources. Memory-mapped file (MappedByteBuffer) cho random access, FileChannel + Buffer cho control chi tiết. Pattern xử lý file 10GB không OOM.
Quay lại task mở đầu bài 11.2: đọc log file 10GB. Thử Files.readAllLines:
List<String> lines = Files.readAllLines(Path.of("app.log")); // OOM
10GB text = ~15-20GB heap (UTF-16 overhead của String trong JVM + object header). Heap default vài GB → crash ngay.
Chuyển sang BufferedReader loop while (readLine) — chạy được nhưng imperative:
try (BufferedReader r = Files.newBufferedReader(path)) {
long errorCount = 0;
String line;
while ((line = r.readLine()) != null) {
if (line.contains("ERROR")) errorCount++;
}
System.out.println(errorCount);
}
6 dòng cho task đơn giản. Intent "đếm ERROR" không rõ từ code — phải skim qua loop structure.
Files.lines + Stream API kết hợp tốt nhất của cả hai: memory constant (không OOM), code declarative (đúng business intent):
try (Stream<String> lines = Files.lines(path)) {
long errorCount = lines.filter(l -> l.contains("ERROR")).count();
System.out.println(errorCount);
}
3 dòng. Đọc là hiểu. Memory ~KB buffer, scale với file bất kỳ size.
Bài này giải thích 3 kỹ thuật file I/O cho dữ liệu lớn:
Files.lines+ Stream API — 90% use case text processing.- Memory-mapped file (
MappedByteBuffer) — random access binary lớn, không tốn heap. - FileChannel +
ByteBuffer— NIO classic, control chi tiết, dùng cho copy zero-copy và pattern đặc biệt.
1. Files.lines + Stream API — pattern chuẩn cho text
Cách hoạt động
try (Stream<String> lines = Files.lines(Path.of("app.log"), StandardCharsets.UTF_8)) {
long errorCount = lines
.filter(l -> l.contains("ERROR"))
.count();
}
Cơ chế bên dưới:
Files.linesmở file quaBufferedReaderinternal.- Wrap
BufferedReaderthànhStream<String>quaSpliterator. - Mỗi element stream được tạo khi pipeline yêu cầu — gọi
readLine()lấy dòng kế. - Stream lazy (bài 9.2) — chỉ line đang xử lý ở heap.
Memory: ~KB cho buffer + 1 line Java object tại 1 thời điểm. File 10GB, 100GB, 1TB đều chạy được miễn có disk space.
Processing time linear với file size — I/O bound (đọc disk) không CPU bound.
Bắt buộc try-with-resources
// BAD
Files.lines(path).filter(...).count(); // Leak file descriptor
Stream<String> implement AutoCloseable. Khi scope kết thúc, JVM call .close() → close underlying BufferedReader → release FD.
Nếu không try-with-resources, FD giữ cho đến khi GC dọn stream object — timing không xác định, có thể lâu. Service 24/7 chạy leak → "Too many open files".
Javadoc của Files.lines ghi rõ: "The returned stream contains a reference to an open file. The file is closed by closing the stream."
Pipeline thực tế — phân tích log
Task: file log 1GB format [TIMESTAMP] LEVEL [REQUEST_ID] message. Tìm top 10 request ID có nhiều ERROR nhất.
record LogLine(Instant time, String level, String requestId, String message) {
static final Pattern PATTERN = Pattern.compile(
"\\[([^\\]]+)\\] (\\w+) \\[([^\\]]+)\\] (.*)"
);
static LogLine parse(String line) {
Matcher m = PATTERN.matcher(line);
if (!m.matches()) return null;
try {
return new LogLine(
Instant.parse(m.group(1)),
m.group(2),
m.group(3),
m.group(4)
);
} catch (DateTimeException e) {
return null;
}
}
}
Map<String, Long> topErrors;
try (Stream<String> lines = Files.lines(Path.of("app.log"), StandardCharsets.UTF_8)) {
topErrors = lines
.map(LogLine::parse)
.filter(Objects::nonNull) // Skip line khong match
.filter(l -> "ERROR".equals(l.level()))
.collect(Collectors.groupingBy(
LogLine::requestId,
Collectors.counting()))
.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(10)
.collect(Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue,
(a, b) -> a, LinkedHashMap::new));
}
Memory: ~few MB (JVM heap cho map counts + line buffer). File 1GB → OK. 100GB → OK. Chỉ phụ thuộc số unique requestId có ERROR — aggregated data, không toàn bộ file.
Imperative tương đương: 2 for loop lồng + HashMap thủ công, ~30 dòng. Stream version 10 dòng declarative.
2. Memory-mapped file — MappedByteBuffer
Concept
Memory-mapped file: OS map vùng địa chỉ virtual memory của process tới content file. Đọc/ghi vùng đó = đọc/ghi file, nhưng không copy vào JVM heap — data nằm trong OS page cache.
try (FileChannel ch = FileChannel.open(
Path.of("big.dat"), StandardOpenOption.READ)) {
long size = ch.size();
MappedByteBuffer buf = ch.map(
FileChannel.MapMode.READ_ONLY, 0, size);
byte[] first100 = new byte[100];
buf.get(first100);
buf.position(10_000_000); // Seek
byte b = buf.get();
}
Ưu điểm
1. Random access cực nhanh File nằm trong OS page cache. Đọc position bất kỳ = memory access thường, không syscall. Với access pattern "đọc pos 1M, rồi pos 500M, rồi pos 100K" — mỗi đọc ~100ns (memory read), không ~3μs (syscall read).
Ngược lại, FileChannel.read(buf, position) mỗi lần là 1 syscall seek + read.
2. File > JVM heap OK Map 10GB file với heap 1GB OK. Buffer không nằm trong JVM heap — nó là virtual memory reference tới page cache OS.
3. Share giữa process Nhiều process map cùng file → cùng page cache → share data không copy. Advanced use case (IPC, shared cache).
Nhược điểm
1. File > 2GB cần map nhiều phần
Java MappedByteBuffer dùng int index — limit 2GB (Integer.MAX_VALUE). File lớn hơn phải chia map nhiều region:
long fileSize = ch.size();
long chunkSize = Integer.MAX_VALUE;
List<MappedByteBuffer> chunks = new ArrayList<>();
for (long offset = 0; offset < fileSize; offset += chunkSize) {
long size = Math.min(chunkSize, fileSize - offset);
chunks.add(ch.map(FileChannel.MapMode.READ_ONLY, offset, size));
}
Java 21+ có API MemorySegment (Foreign Function & Memory API) không có limit 2GB — alternative cho use case lớn.
2. Unmap không control được
MappedByteBuffer không có method unmap() chuẩn — buffer giữ page cache đến khi GC dọn. File đang map có thể không rename/delete được trên Windows.
Workaround (not standard): sun.nio.ch.FileChannelImpl.unmap(buffer) — private API, fragile.
3. Write durability không đảm bảo
Write vào MappedByteBuffer đi vào OS page cache, chưa chắc xuống disk. buf.force() gọi fsync — đảm bảo disk.
Use case
- Database index / B-tree: random seek, read-heavy — OS cache tự optimize.
- Binary format parsing: header + pointer offset (ELF, DEX, serialization format).
- Full-text search index: Lucene dùng memory-mapped cho segment file.
- Shared memory IPC: 2 process map cùng file.
Không đáng cho sequential scan 1 lần — BufferedInputStream đơn giản hơn, perf tương đương.
3. FileChannel + ByteBuffer — NIO channel API
NIO thế hệ 1 (Java 1.4, 2002): introduces Channel + Buffer + Selector. Thiết kế cho non-blocking I/O.
Pattern cơ bản
try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buf = ByteBuffer.allocate(4096);
while (ch.read(buf) != -1) {
buf.flip(); // Switch to read mode
while (buf.hasRemaining()) {
byte b = buf.get();
process(b);
}
buf.clear(); // Reset to write mode
}
}
Buffer lifecycle
ByteBuffer có 3 pointer: position, limit, capacity.
Empty buffer after allocate(1024):
position=0, limit=1024, capacity=1024
|=====================|
0 1024
After writing 100 bytes:
position=100, limit=1024, capacity=1024
|....|================|
0 100 1024
After flip():
position=0, limit=100, capacity=1024
|====| |
0 100 1024
(mode read: data tu 0 den 100)
After reading 50 bytes:
position=50, limit=100, capacity=1024
|====| |
0 50 100 1024
After clear():
position=0, limit=1024, capacity=1024
|=====================|
(sua back mode write)
Method lifecycle:
allocate(n)→ write mode.flip()→ switch sang read mode (limit = position, position = 0).clear()→ back về write mode (position = 0, limit = capacity).compact()→ keep unread data, ready cho write tiếp.
Verbose. Dễ bug nếu quên flip() sau write hoặc clear() giữa iterations.
Khi nào dùng FileChannel thay BufferedInputStream?
BufferedInputStream đơn giản hơn cho sequential read. FileChannel có ưu thế cho:
1. Random access với position:
ByteBuffer buf = ByteBuffer.allocate(1024);
ch.read(buf, 10_000_000); // Doc tu offset, khong seek thu cong
2. transferTo — zero-copy:
try (FileChannel src = FileChannel.open(source, StandardOpenOption.READ);
FileChannel dst = FileChannel.open(dest, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
src.transferTo(0, src.size(), dst);
}
transferTo dùng sendfile syscall (Linux) — OS copy data trực tiếp từ disk page cache sang disk page cache khác, không qua user space. Nhanh hơn read-write loop ~2-3×.
Dùng cho: file server, backup, streaming large file (HTTP response body).
3. Lock vùng file:
FileLock lock = ch.lock(offset, length, shared);
try {
// Exclusive access to this region
} finally {
lock.release();
}
Cross-process file locking — 2 process dùng cùng file.
4. Non-blocking I/O (với selector): Pattern cho server socket NIO — không áp dụng file thông thường.
4. Khi nào chọn API nào
flowchart TD
A[Xu ly file] --> B{Text hay binary?}
B -->|Text| C{Size?}
C -->|Nho, biet| D[Files.readString]
C -->|Lon/Stream| E[Files.lines + Stream]
B -->|Binary| F{Pattern?}
F -->|Sequential read| G[BufferedInputStream]
F -->|Random access<br/>File lon| H[MappedByteBuffer]
F -->|Copy / Transfer| I[FileChannel.transferTo]
F -->|Non-blocking socket| J[AsynchronousFileChannel]Rule thực tế:
- Default:
Files.linescho text,Files.newInputStreamcho binary sequential. - Large sequential text:
Files.lines+ Stream API (bài này). - Random access binary:
MappedByteBuffer. - High-perf copy:
FileChannel.transferTo. - Async I/O:
AsynchronousFileChannel— callback/Future based. Java 21 virtual thread làm pattern này ít cần thiết hơn (bài 10.5).
5. Pattern thực tế — word count file lớn
Task: đọc file 10GB text, đếm tần suất mỗi từ, in top 20.
try (Stream<String> lines = Files.lines(Path.of("huge.txt"), StandardCharsets.UTF_8)) {
Map<String, Long> wordCount = lines
.flatMap(l -> Arrays.stream(l.split("\\s+")))
.filter(w -> !w.isEmpty())
.map(String::toLowerCase)
.collect(Collectors.groupingBy(
Function.identity(),
Collectors.counting()));
wordCount.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(20)
.forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));
}
Memory: phụ thuộc số distinct word (thường dưới 1 triệu cho text English). ~100MB heap cho map. File size không giới hạn.
Imperative: 2 loop + HashMap thủ công, ~25 dòng. Stream 10 dòng — đúng business intent.
6. AsynchronousFileChannel — async I/O
AsynchronousFileChannel ch = AsynchronousFileChannel.open(
path, StandardOpenOption.READ);
ByteBuffer buf = ByteBuffer.allocate(4096);
Future<Integer> future = ch.read(buf, 0);
// Lam viec khac
doOtherStuff();
int n = future.get(); // Block khi can ket qua
Hoặc callback:
ch.read(buf, 0, null, new CompletionHandler<Integer, Void>() {
@Override public void completed(Integer n, Void attachment) {
System.out.println("Read " + n + " bytes");
}
@Override public void failed(Throwable ex, Void attachment) {
ex.printStackTrace();
}
});
Thiết kế cho high-concurrency server pre-virtual-thread. Java 21+ có virtual thread (bài 10.5) — pattern blocking API trên virtual thread thường dễ đọc hơn async callback.
Use case còn lại của AsynchronousFileChannel:
- Code base cũ đã dùng.
- Legacy library expect
CompletionHandlerinterface. - Mix với reactive stream (Reactor, RxJava).
Code mới đơn giản: blocking API trên virtual thread.
7. Pitfall tổng hợp
❌ Nhầm 1: Files.lines không close.
Files.lines(path).forEach(...); // Leak FD
✅ Try-with-resources.
❌ Nhầm 2: Files.readAllLines cho file lớn.
List<String> all = Files.readAllLines(Path.of("10gb.log")); // OOM
✅ Files.lines + stream, memory constant.
❌ Nhầm 3: Quên flip() sau write buffer.
buf.put(data);
byte b = buf.get(); // Doc tu position sau write, get data khong dinh
✅ flip() trước read:
buf.put(data);
buf.flip();
byte b = buf.get();
❌ Nhầm 4: MappedByteBuffer cho file nhỏ.
ch.map(MapMode.READ_ONLY, 0, 1024); // Overhead > loi ich
✅ File nhỏ: Files.readAllBytes hoặc ByteBuffer.wrap(readAllBytes).
❌ Nhầm 5: Không force() khi cần durability.
buf.put(data);
// JVM crash -> data in OS cache, chua xuong disk
✅ buf.force() với MappedByteBuffer; ch.force(true) với FileChannel.
❌ Nhầm 6: ByteBuffer.allocate lặp trong loop.
while (reading) {
ByteBuffer buf = ByteBuffer.allocate(8192); // Allocate moi iteration
ch.read(buf);
// ...
}
✅ Allocate 1 lần ngoài loop, clear() reuse:
ByteBuffer buf = ByteBuffer.allocate(8192);
while (reading) {
buf.clear();
ch.read(buf);
// ...
}
8. 📚 Deep Dive Oracle
Spec / reference chính thức:
- Files.lines — spec + close requirement.
- FileChannel — channel-based API.
- MappedByteBuffer — memory-mapped file.
- AsynchronousFileChannel — async API.
- JEP 424: Foreign Function & Memory API — Java 21 MemorySegment không limit 2GB, alternative cho MappedByteBuffer.
- Exploring memory-mapped files in Java — Oracle Java Magazine article với benchmark.
Ghi chú: Oracle Java Magazine có series về NIO performance. Benchmark thực cho memory-mapped vs channel read: mapped nhanh hơn 2-3× với random access, tương đương với sequential scan. Nếu optimize file-heavy workload, đọc series này để tránh premature optimization. Với Java 21+, cân nhắc MemorySegment — API mới thay thế MappedByteBuffer cho file lớn.
9. Tóm tắt
Files.lines(path)+ Stream API = declarative xử lý file text lớn, memory constant. 90% use case text processing.- Luôn close
Files.linesqua try-with-resources — stream giữ file handle. MappedByteBuffermap file vào address space process — random access nhanh, không tốn JVM heap.MappedByteBufferlimit 2GB (int index) — file lớn hơn chia multiple region hoặc dùngMemorySegment(Java 21+).FileChannel+ByteBuffer— NIO channel API, verbose lifecycle (allocate → write → flip → read → clear).FileChannel.transferTozero-copy ở kernel level (sendfile) — tối ưu copy file, streaming response.AsynchronousFileChannelcho async I/O pre-virtual-thread; Java 21+ virtual thread giảm nhu cầu.- Rule chọn API: default
Files.lines/newInputStreamsequential; random access → mapped; copy → transferTo; non-blocking → async. - Pattern count/filter/group trên log file → stream + collectors — replace 2-3× code imperative.
10. Tự kiểm tra
Q1Vì sao Files.lines scale được với file 10GB mà readAllLines thì không?▸
Files.lines scale được với file 10GB mà readAllLines thì không?readAllLines load toàn bộ vào List<String> trong JVM heap. 10GB text UTF-8 → ~15-20GB heap vì:
Stringtrong JVM dùng UTF-16 (2 byte/char cho BMP, 4 byte cho supplementary) — overhead gấp đôi so với UTF-8 disk.- Mỗi
Stringobject có header (16 byte) + length field + char array header.
Heap default vài GB → OOM ngay.
Files.lines trả lazy stream — mỗi lần pipeline cần element, đọc 1 line từ BufferedReader internal. Chỉ 1 line + buffer 8KB ở heap tại 1 thời điểm. Memory constant bất kể file size.
Processing time linear với file size — I/O bound (đọc disk), không RAM bound.
Rule: file size tiên đoán được + nhỏ (< 100MB) → readAllLines tiện. Không biết size hoặc có thể lớn → stream-based.
Q2Khi nào dùng MappedByteBuffer thay BufferedInputStream?▸
MappedByteBuffer thay BufferedInputStream?Dùng MappedByteBuffer khi:
- Random access binary: database index (B-tree), binary format cần seek nhiều lần (ELF, DEX, Lucene segment). OS page cache tự optimize — "hot region" của file cache trong RAM, access sau nhanh.
- File > JVM heap: map tránh load vào heap. Memory nằm trong OS page cache, không trong heap.
- Share data cross-process: nhiều process map cùng file → cùng page cache → share data không copy. Advanced IPC use case.
Không đáng với sequential scan 1 lần — BufferedInputStream đơn giản hơn, perf tương đương (cả hai đều đọc tuần tự qua page cache).
Benchmark thực: mapped thắng 2-3× với random access, tương đương hoặc hơi chậm với sequential (overhead setup mmap). Đo cụ thể với workload production trước khi commit vào mapped.
Q3Cơ chế buffer.flip() trong NIO là gì, và vì sao cần thiết?▸
buffer.flip() trong NIO là gì, và vì sao cần thiết?NIO ByteBuffer có 3 pointer: position, limit, capacity. Buffer có 2 mode:
- Write mode:
positionlà chỗ ghi tiếp,limit=capacity. "Còn bao nhiêu chỗ trống". - Read mode:
positionlà chỗ đọc tiếp,limitlà chỗ cuối data hợp lệ. "Còn bao nhiêu data chưa đọc".
Lifecycle:
allocate(n)→ mode write, position=0, limit=n.- Write 100 byte → position=100, limit=n.
flip()→ switch mode read, limit=100 (lưu giá trị position cũ), position=0.- Read data từ 0 đến 100.
clear()→ back mode write cho iteration kế.
Quên flip sau write → read từ position đang ở cuối (sau data) → không data / data invalid. Common bug người mới NIO.
Phát hiện: chạy buffer nhưng không data ra → check có flip sau write không.
Q4Khi nào cần force/flush khi ghi file, và 2 tầng buffer là gì?▸
Data ghi đi qua 2 tầng buffer:
- JVM buffer → OS:
BufferedWriter.flush(),OutputStream.flush(),MappedByteBuffer.force()push data xuống OS page cache. Nhanh. - OS cache → disk:
fos.getFD().sync(),FileChannel.force(true)gọi fsync syscall — buộc OS ghi page cache xuống disk thực. Chậm ~5-50ms/call.
Tầng 1 đủ khi: mất data trong buffer có thể chấp nhận (vd log debug). JVM crash → mất buffer JVM, nhưng data đã flush xuống OS vẫn persist khi OS không crash.
Tầng 2 cần khi: power-off mất data không chấp nhận được. Transaction log DB (WAL), financial audit, medical record. Mỗi write critical cần fsync.
Cost fsync: chậm. Không gọi mỗi write — batch và fsync cuối batch, hoặc fsync theo interval (1s/lần).
Compromise: SQLite mặc định fsync mỗi commit (durable), có option PRAGMA synchronous=OFF để bỏ (nhanh hơn nhưng mất durability).
Q5Đoạn sau có vấn đề gì? Stream<String> s = Files.lines(path); s.filter(...).count();▸
Stream<String> s = Files.lines(path); s.filter(...).count();Stream không close — file handle leak.
Intermediate op filter và terminal count không tự close stream. Stream implement AutoCloseable và javadoc Files.lines nói rõ phải close để release underlying resource (BufferedReader + FileChannel).
Fix:
try (Stream<String> s = Files.lines(path)) {
long n = s.filter(...).count();
}Hiệu ứng leak: service 24/7 chạy gọi hàm này hàng nghìn lần/giờ → FD leak tăng dần. Linux default limit 1024 FD/process → "Too many open files" sau vài ngày. macOS limit thấp hơn → fail sớm hơn.
Đây là một trong những leak phổ biến nhất trong code Java dùng Stream API — dễ quên vì intermediate/terminal op trên stream không gợi ý "cần close". Rule: stream có underlying resource (file, socket, DB) → luôn try-with-resources.
Q6Vì sao FileChannel.transferTo nhanh hơn viết loop read/write?▸
FileChannel.transferTo nhanh hơn viết loop read/write?Loop read/write truyền thống:
byte[] buf = new byte[8192];
int n;
while ((n = in.read(buf)) != -1) {
out.write(buf, 0, n);
}Mỗi byte đi qua:
- Disk → OS page cache (read).
- OS page cache → JVM buffer (syscall copy).
- JVM buffer → OS page cache (syscall copy).
- OS page cache → disk (write).
2 lần copy user-kernel space. Với file 1GB = 2GB data copy, ~100k syscall.
FileChannel.transferTo dùng sendfile syscall (Linux) hoặc tương đương OS khác:
src.transferTo(0, src.size(), dst);OS copy trực tiếp page cache → page cache trong kernel, không qua user space. Zero-copy. 1 syscall thay 100k.
Tốc độ: thường nhanh hơn 2-3×, CPU usage giảm đáng kể.
Use case: HTTP file server (Apache, nginx dùng sendfile), backup tool, streaming large response body. Spring Framework StreamingResponseBody dùng transferTo khi có thể.
Bài tiếp theo: Mini-challenge: Log aggregator với NIO.2 và Stream
Bài này có giúp bạn hiểu bản chất không?