Stream file — Files.lines, memory-mapped, channel
Xử lý file lớn stream-based với Files.lines, try-with-resources. Memory-mapped file (MappedByteBuffer) cho random access, FileChannel + Buffer cho control chi tiết. Pattern xử lý file 10GB không OOM.
TL;DR: Ba kỹ thuật xử lý file lớn: (1) Files.lines + Stream API — lazy line-by-line, memory constant, cover 90% text processing, bắt buộc try-with-resources; (2) MappedByteBuffer — map file vào virtual memory, random access nhanh như đọc RAM, file lớn hơn heap vẫn OK, nhưng limit 2GB mỗi region và không unmap chủ động được; (3) FileChannel + ByteBuffer — control chi tiết, có transferTo zero-copy (sendfile) cho copy/streaming. Quy tắc chọn: text sequential → Files.lines; binary random access → mapped; copy hiệu năng cao → transferTo.
Quay lại bài toán quen thuộc từ bài Path và Files: đọc log file 10GB. Thử Files.readAllLines:
List<String> lines = Files.readAllLines(Path.of("app.log")); // OOM
10GB text trong file tốn hơn 10GB heap. Từ Java 9, Compact Strings (JEP 254) lưu chuỗi Latin-1 với 1 byte/char nên char array tổng ≈ size file — nhưng cộng thêm object header (~16 byte) + reference trong List cho mỗi dòng, với log dòng ngắn overhead chiếm thêm 30-50%. Heap default vài GB nên crash ngay từ khi load.
Chuyển sang BufferedReader loop while (readLine) — chạy được nhưng imperative:
try (BufferedReader r = Files.newBufferedReader(path)) {
long errorCount = 0;
String line;
while ((line = r.readLine()) != null) {
if (line.contains("ERROR")) errorCount++;
}
System.out.println(errorCount);
}
6 dòng cho task đơn giản. Intent "đếm ERROR" không rõ từ code — phải skim qua loop structure.
Files.lines + Stream API kết hợp tốt nhất của cả hai: memory constant (không OOM), code declarative (đúng business intent):
try (Stream<String> lines = Files.lines(path)) {
long errorCount = lines.filter(l -> l.contains("ERROR")).count();
System.out.println(errorCount);
}
3 dòng. Đọc là hiểu. Memory ~KB buffer, scale với file bất kỳ size.
Bài này giải thích 3 kỹ thuật file I/O cho dữ liệu lớn:
Files.lines+ Stream API — 90% use case text processing.- Memory-mapped file (
MappedByteBuffer) — random access binary lớn, không tốn heap. - FileChannel +
ByteBuffer— NIO classic, control chi tiết, dùng cho copy zero-copy và pattern đặc biệt.
1. Files.lines + Stream API — pattern chuẩn cho text
Cách hoạt động
try (Stream<String> lines = Files.lines(Path.of("app.log"), StandardCharsets.UTF_8)) {
long errorCount = lines
.filter(l -> l.contains("ERROR"))
.count();
}
Cơ chế bên dưới:
Files.linesmở file quaBufferedReaderinternal.- Wrap
BufferedReaderthànhStream<String>quaSpliterator(iterator chuyên cho Stream API — biết cách duyệt tuần tự và tự chia nhỏ để chạy song song). - Mỗi element stream được tạo khi pipeline yêu cầu — gọi
readLine()lấy dòng kế. - Stream lazy (xem Stream basics) — chỉ line đang xử lý ở heap.
Memory: ~KB cho buffer + 1 line Java object tại 1 thời điểm. File 10GB, 100GB, 1TB đều chạy được miễn có disk space.
Processing time linear với file size — I/O bound (đọc disk) không CPU bound.
Bắt buộc try-with-resources
// BAD
Files.lines(path).filter(...).count(); // Leak file descriptor
Stream<String> implement AutoCloseable. Khi scope kết thúc, JVM call .close() → close underlying BufferedReader → release FD.
Nếu không try-with-resources, FD giữ cho đến khi GC dọn stream object — timing không xác định, có thể lâu. Service 24/7 chạy leak → "Too many open files".
Javadoc của Files.lines ghi rõ: "The returned stream contains a reference to an open file. The file is closed by closing the stream."
Pipeline thực tế — phân tích log
Task: file log 1GB format [TIMESTAMP] LEVEL [REQUEST_ID] message. Tìm top 10 request ID có nhiều ERROR nhất.
record LogLine(Instant time, String level, String requestId, String message) {
static final Pattern PATTERN = Pattern.compile(
"\\[([^\\]]+)\\] (\\w+) \\[([^\\]]+)\\] (.*)"
);
static LogLine parse(String line) {
Matcher m = PATTERN.matcher(line);
if (!m.matches()) return null;
try {
return new LogLine(
Instant.parse(m.group(1)),
m.group(2),
m.group(3),
m.group(4)
);
} catch (DateTimeException e) {
return null;
}
}
}
Map<String, Long> topErrors;
try (Stream<String> lines = Files.lines(Path.of("app.log"), StandardCharsets.UTF_8)) {
topErrors = lines
.map(LogLine::parse)
.filter(Objects::nonNull) // Skip line khong match
.filter(l -> "ERROR".equals(l.level()))
.collect(Collectors.groupingBy(
LogLine::requestId,
Collectors.counting()))
.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(10)
.collect(Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue,
(a, b) -> a, LinkedHashMap::new));
}
Memory: ~few MB (JVM heap cho map counts + line buffer). File 1GB → OK. 100GB → OK. Chỉ phụ thuộc số unique requestId có ERROR — aggregated data, không toàn bộ file.
Imperative tương đương: 2 for loop lồng + HashMap thủ công, ~30 dòng. Stream version 10 dòng declarative.
2. Memory-mapped file — MappedByteBuffer
Concept
Memory-mapped file: OS map vùng địa chỉ virtual memory của process tới content file. Đọc/ghi vùng đó = đọc/ghi file, nhưng không copy vào JVM heap — data nằm trong OS page cache.
try (FileChannel ch = FileChannel.open(
Path.of("big.dat"), StandardOpenOption.READ)) {
long size = ch.size();
MappedByteBuffer buf = ch.map(
FileChannel.MapMode.READ_ONLY, 0, size);
byte[] first100 = new byte[100];
buf.get(first100);
buf.position(10_000_000); // Seek
byte b = buf.get();
}
Ưu điểm
1. Random access cực nhanh File nằm trong OS page cache. Đọc position bất kỳ = memory access thường, không syscall. Với access pattern "đọc pos 1M, rồi pos 500M, rồi pos 100K" — mỗi đọc ~100ns (memory read), không ~3μs (syscall read).
Ngược lại, FileChannel.read(buf, position) mỗi lần là 1 syscall seek + read.
2. File lớn hơn JVM heap OK Map 10GB file với heap 1GB OK. Buffer không nằm trong JVM heap — nó là virtual memory reference tới page cache OS.
3. Share giữa process Nhiều process map cùng file → cùng page cache → share data không copy. Use case nâng cao: IPC (inter-process communication — trao đổi dữ liệu giữa các process), shared cache.
Nhược điểm
1. File vượt 2GB cần map nhiều phần
Java MappedByteBuffer dùng int index — limit 2GB (Integer.MAX_VALUE). File lớn hơn phải chia map nhiều region:
long fileSize = ch.size();
long chunkSize = Integer.MAX_VALUE;
List<MappedByteBuffer> chunks = new ArrayList<>();
for (long offset = 0; offset < fileSize; offset += chunkSize) {
long size = Math.min(chunkSize, fileSize - offset);
chunks.add(ch.map(FileChannel.MapMode.READ_ONLY, offset, size));
}
Alternative hiện đại: MemorySegment của Foreign Function & Memory API — preview từ Java 19 (JEP 424), final ở Java 22 (JEP 454) — dùng long index, không có limit 2GB.
2. Unmap không control được
MappedByteBuffer không có method unmap() chuẩn — buffer giữ page cache đến khi GC dọn. File đang map có thể không rename/delete được trên Windows.
Workaround (not standard): sun.nio.ch.FileChannelImpl.unmap(buffer) — private API, fragile.
3. Write durability không đảm bảo
Write vào MappedByteBuffer đi vào OS page cache, chưa chắc xuống disk. buf.force() gọi fsync — đảm bảo disk.
Use case
- Database index / B-tree: random seek, read-heavy — OS cache tự optimize.
- Binary format parsing: header + pointer offset (ELF, DEX, serialization format).
- Full-text search index: Lucene dùng memory-mapped cho segment file.
- Shared memory IPC: 2 process map cùng file.
Không đáng cho sequential scan 1 lần — BufferedInputStream đơn giản hơn, perf tương đương.
3. FileChannel + ByteBuffer — NIO channel API
NIO thế hệ 1 (Java 1.4, 2002): introduces Channel + Buffer + Selector. Thiết kế cho non-blocking I/O.
Pattern cơ bản
try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buf = ByteBuffer.allocate(4096);
while (ch.read(buf) != -1) {
buf.flip(); // Switch to read mode
while (buf.hasRemaining()) {
byte b = buf.get();
process(b);
}
buf.clear(); // Reset to write mode
}
}
Buffer lifecycle
ByteBuffer có 3 pointer: position, limit, capacity.
Empty buffer after allocate(1024):
position=0, limit=1024, capacity=1024
|=====================|
0 1024
After writing 100 bytes:
position=100, limit=1024, capacity=1024
|....|================|
0 100 1024
After flip():
position=0, limit=100, capacity=1024
|====| |
0 100 1024
(mode read: data tu 0 den 100)
After clear():
position=0, limit=1024, capacity=1024
|=====================|
(sua back mode write)
Method lifecycle:
allocate(n)→ write mode.flip()→ switch sang read mode (limit = position, position = 0).clear()→ back về write mode (position = 0, limit = capacity).compact()→ keep unread data, ready cho write tiếp.
Verbose. Dễ bug nếu quên flip() sau write hoặc clear() giữa iterations.
Khi nào dùng FileChannel thay BufferedInputStream?
BufferedInputStream đơn giản hơn cho sequential read. FileChannel có ưu thế cho:
1. Random access với position:
ByteBuffer buf = ByteBuffer.allocate(1024);
ch.read(buf, 10_000_000); // Doc tu offset, khong seek thu cong
2. transferTo — zero-copy:
try (FileChannel src = FileChannel.open(source, StandardOpenOption.READ);
FileChannel dst = FileChannel.open(dest, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
src.transferTo(0, src.size(), dst);
}
transferTo dùng sendfile syscall (Linux) — OS copy data trực tiếp từ disk page cache sang disk page cache khác, không qua user space. Nhanh hơn read-write loop ~2-3×.
Dùng cho: file server, backup, streaming large file (HTTP response body).
3. Lock vùng file:
FileLock lock = ch.lock(offset, length, shared);
try {
// Exclusive access to this region
} finally {
lock.release();
}
Cross-process file locking — 2 process dùng cùng file.
4. Khi nào chọn API nào
flowchart TD
A[Xu ly file] --> B{Text hay binary?}
B -->|Text| C{Size?}
C -->|Nho, biet| D[Files.readString]
C -->|Lon/Stream| E[Files.lines + Stream]
B -->|Binary| F{Pattern?}
F -->|Sequential read| G[BufferedInputStream]
F -->|Random access<br/>File lon| H[MappedByteBuffer]
F -->|Copy / Transfer| I[FileChannel.transferTo]
F -->|Non-blocking socket| J[AsynchronousFileChannel]Rule thực tế:
- Default:
Files.linescho text,Files.newInputStreamcho binary sequential. - Large sequential text:
Files.lines+ Stream API (bài này). - Random access binary:
MappedByteBuffer. - High-perf copy:
FileChannel.transferTo. - Async I/O:
AsynchronousFileChannel— callback/Future based. Virtual thread (Java 21) làm pattern này ít cần thiết hơn.
5. Pattern thực tế — word count file lớn
Task: đọc file 10GB text, đếm tần suất mỗi từ, in top 20.
try (Stream<String> lines = Files.lines(Path.of("huge.txt"), StandardCharsets.UTF_8)) {
Map<String, Long> wordCount = lines
.flatMap(l -> Arrays.stream(l.split("\\s+")))
.filter(w -> !w.isEmpty())
.map(String::toLowerCase)
.collect(Collectors.groupingBy(
Function.identity(),
Collectors.counting()));
wordCount.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(20)
.forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));
}
Memory: phụ thuộc số distinct word (thường dưới 1 triệu cho text English). ~100MB heap cho map. File size không giới hạn.
6. AsynchronousFileChannel — async I/O
AsynchronousFileChannel ch = AsynchronousFileChannel.open(
path, StandardOpenOption.READ);
ByteBuffer buf = ByteBuffer.allocate(4096);
Future<Integer> future = ch.read(buf, 0);
// Lam viec khac
doOtherStuff();
int n = future.get(); // Block khi can ket qua
Ngoài Future, còn variant nhận CompletionHandler callback (completed/failed).
Thiết kế cho high-concurrency server pre-virtual-thread. Java 21+ có virtual thread — pattern blocking API trên virtual thread thường dễ đọc hơn async callback. AsynchronousFileChannel còn lại cho: code base cũ, library expect CompletionHandler, mix với reactive stream (Reactor, RxJava).
7. Pitfall tổng hợp
❌ Nhầm 1: Files.lines không close.
Files.lines(path).forEach(...); // Leak FD
✅ Try-with-resources.
❌ Nhầm 2: Files.readAllLines cho file lớn.
List<String> all = Files.readAllLines(Path.of("10gb.log")); // OOM
✅ Files.lines + stream, memory constant.
❌ Nhầm 3: Quên flip() sau write buffer.
buf.put(data);
byte b = buf.get(); // Doc tu position sau write, get data khong dinh
✅ flip() trước read:
buf.put(data);
buf.flip();
byte b = buf.get();
❌ Nhầm 4: MappedByteBuffer cho file nhỏ.
ch.map(MapMode.READ_ONLY, 0, 1024); // Overhead > loi ich
✅ File nhỏ: Files.readAllBytes hoặc ByteBuffer.wrap(readAllBytes).
❌ Nhầm 5: Không force() khi cần durability.
buf.put(data);
// JVM crash -> data in OS cache, chua xuong disk
✅ buf.force() với MappedByteBuffer; ch.force(true) với FileChannel.
❌ Nhầm 6: ByteBuffer.allocate lặp trong loop.
while (reading) {
ByteBuffer buf = ByteBuffer.allocate(8192); // Allocate moi iteration
ch.read(buf);
// ...
}
✅ Allocate 1 lần ngoài loop, clear() reuse.
8. 📚 Deep Dive Oracle
Spec / reference chính thức:
- Files.lines — spec + close requirement.
- FileChannel — channel-based API.
- MappedByteBuffer — memory-mapped file.
- AsynchronousFileChannel — async API.
- JEP 454: Foreign Function & Memory API — final Java 22 (preview từ Java 19 qua JEP 424);
MemorySegmentkhông limit 2GB, alternative cho MappedByteBuffer. - JEP 254: Compact Strings — Java 9, String Latin-1 lưu 1 byte/char.
- Exploring memory-mapped files in Java — Oracle Java Magazine article với benchmark.
Ghi chú: Oracle Java Magazine có series về NIO performance. Benchmark thực cho memory-mapped vs channel read: mapped nhanh hơn 2-3× với random access, tương đương với sequential scan. Nếu optimize file-heavy workload, đọc series này để tránh premature optimization. Với Java 22+, cân nhắc MemorySegment — API mới thay thế MappedByteBuffer cho file lớn.
9. Tóm tắt
Files.lines(path)+ Stream API = declarative xử lý file text lớn, memory constant. 90% use case text processing.- Luôn close
Files.linesqua try-with-resources — stream giữ file handle. MappedByteBuffermap file vào address space process — random access nhanh, không tốn JVM heap.MappedByteBufferlimit 2GB (int index) — file lớn hơn chia multiple region hoặc dùngMemorySegment(FFM API, final Java 22 — JEP 454).FileChannel+ByteBuffer— NIO channel API, verbose lifecycle (allocate → write → flip → read → clear).FileChannel.transferTozero-copy ở kernel level (sendfile) — tối ưu copy file, streaming response.AsynchronousFileChannelcho async I/O pre-virtual-thread; Java 21+ virtual thread giảm nhu cầu.- Rule chọn API: default
Files.lines/newInputStreamsequential; random access → mapped; copy → transferTo; non-blocking → async. - Pattern count/filter/group trên log file → stream + collectors — replace 2-3× code imperative.
10. Tự kiểm tra
Q1Vì sao Files.lines scale được với file 10GB mà readAllLines thì không?▸
Files.lines scale được với file 10GB mà readAllLines thì không?readAllLines load toàn bộ vào List<String> trong JVM heap. 10GB text → hơn 10GB heap vì:
- Char array của String ≈ size file: từ Java 9, Compact Strings (JEP 254) lưu chuỗi Latin-1 với 1 byte/char (text non-ASCII fallback UTF-16 — 2 byte/char, còn tệ hơn).
- Mỗi
Stringobject thêm header (~16 byte) + field + reference trong List — log dòng ngắn thì overhead này chiếm thêm 30-50%.
Heap default vài GB → OOM ngay.
Files.lines trả lazy stream — mỗi lần pipeline cần element, đọc 1 line từ BufferedReader internal. Chỉ 1 line + buffer 8KB ở heap tại 1 thời điểm. Memory constant bất kể file size.
Processing time linear với file size — I/O bound (đọc disk), không RAM bound.
Rule: file size tiên đoán được + nhỏ (< 100MB) → readAllLines tiện. Không biết size hoặc có thể lớn → stream-based.
Q2Khi nào dùng MappedByteBuffer thay BufferedInputStream?▸
MappedByteBuffer thay BufferedInputStream?Dùng MappedByteBuffer khi:
- Random access binary: database index (B-tree), binary format cần seek nhiều lần (ELF, DEX, Lucene segment). OS page cache tự optimize — "hot region" của file cache trong RAM, access sau nhanh.
- File lớn hơn JVM heap: map tránh load vào heap. Memory nằm trong OS page cache, không trong heap.
- Share data cross-process: nhiều process map cùng file → cùng page cache → share data không copy. Advanced IPC use case.
Không đáng với sequential scan 1 lần — BufferedInputStream đơn giản hơn, perf tương đương (cả hai đều đọc tuần tự qua page cache).
Benchmark thực: mapped thắng 2-3× với random access, tương đương hoặc hơi chậm với sequential (overhead setup mmap). Đo cụ thể với workload production trước khi commit vào mapped.
Q3Cơ chế buffer.flip() trong NIO là gì, và vì sao cần thiết?▸
buffer.flip() trong NIO là gì, và vì sao cần thiết?NIO ByteBuffer có 3 pointer: position, limit, capacity. Buffer có 2 mode:
- Write mode:
positionlà chỗ ghi tiếp,limit=capacity. "Còn bao nhiêu chỗ trống". - Read mode:
positionlà chỗ đọc tiếp,limitlà chỗ cuối data hợp lệ. "Còn bao nhiêu data chưa đọc".
Lifecycle:
allocate(n)→ mode write, position=0, limit=n.- Write 100 byte → position=100, limit=n.
flip()→ switch mode read, limit=100 (lưu giá trị position cũ), position=0.- Read data từ 0 đến 100.
clear()→ back mode write cho iteration kế.
Quên flip sau write → read từ position đang ở cuối (sau data) → không data / data invalid. Common bug người mới NIO.
Phát hiện: chạy buffer nhưng không data ra → check có flip sau write không.
Q4Khi nào cần force/flush khi ghi file, và 2 tầng buffer là gì?▸
Data ghi đi qua 2 tầng buffer:
- JVM buffer → OS:
BufferedWriter.flush(),OutputStream.flush(),MappedByteBuffer.force()push data xuống OS page cache. Nhanh. - OS cache → disk:
fos.getFD().sync(),FileChannel.force(true)gọi fsync syscall — buộc OS ghi page cache xuống disk thực. Chậm ~5-50ms/call.
Tầng 1 đủ khi: mất data trong buffer có thể chấp nhận (vd log debug). JVM crash → mất buffer JVM, nhưng data đã flush xuống OS vẫn persist khi OS không crash.
Tầng 2 cần khi: power-off mất data không chấp nhận được. Transaction log DB (WAL), financial audit, medical record. Mỗi write critical cần fsync.
Cost fsync: chậm. Không gọi mỗi write — batch và fsync cuối batch, hoặc fsync theo interval (1s/lần).
Compromise: SQLite mặc định fsync mỗi commit (durable), có option PRAGMA synchronous=OFF để bỏ (nhanh hơn nhưng mất durability).
Q5Đoạn sau có vấn đề gì? Stream<String> s = Files.lines(path); s.filter(...).count();▸
Stream<String> s = Files.lines(path); s.filter(...).count();Stream không close — file handle leak.
Intermediate op filter và terminal count không tự close stream. Stream implement AutoCloseable và javadoc Files.lines nói rõ phải close để release underlying resource (BufferedReader + FileChannel).
Fix:
try (Stream<String> s = Files.lines(path)) {
long n = s.filter(...).count();
}Hiệu ứng leak: service 24/7 gọi hàm này hàng nghìn lần/giờ → FD leak tăng dần → "Too many open files" sau vài ngày. Đây là một trong những leak phổ biến nhất trong code Java dùng Stream API — dễ quên vì op trên stream không gợi ý "cần close". Rule: stream có underlying resource (file, socket, DB) → luôn try-with-resources.
Q6Vì sao FileChannel.transferTo nhanh hơn viết loop read/write?▸
FileChannel.transferTo nhanh hơn viết loop read/write?Loop read/write truyền thống:
byte[] buf = new byte[8192];
int n;
while ((n = in.read(buf)) != -1) {
out.write(buf, 0, n);
}Mỗi byte đi qua:
- Disk → OS page cache (read).
- OS page cache → JVM buffer (syscall copy).
- JVM buffer → OS page cache (syscall copy).
- OS page cache → disk (write).
2 lần copy user-kernel space. Với file 1GB = 2GB data copy, ~100k syscall.
FileChannel.transferTo dùng sendfile syscall (Linux) hoặc tương đương OS khác:
src.transferTo(0, src.size(), dst);OS copy trực tiếp page cache → page cache trong kernel, không qua user space. Zero-copy. 1 syscall thay 100k.
Tốc độ: thường nhanh hơn 2-3×, CPU usage giảm đáng kể.
Use case: HTTP file server (Apache, nginx dùng sendfile), backup tool, streaming large response body. Spring Framework StreamingResponseBody dùng transferTo khi có thể.
Bài tiếp theo: Mini-challenge: Log aggregator với NIO.2 và Stream
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên