Java — Từ Zero đến Senior/Stream file — Files.lines, memory-mapped, channel
~22 phútI/O & NIOMiễn phí

Stream file — Files.lines, memory-mapped, channel

Xử lý file lớn stream-based với Files.lines, try-with-resources. Memory-mapped file (MappedByteBuffer) cho random access, FileChannel + Buffer cho control chi tiết. Pattern xử lý file 10GB không OOM.

Quay lại task mở đầu bài 11.2: đọc log file 10GB. Thử Files.readAllLines:

List<String> lines = Files.readAllLines(Path.of("app.log"));   // OOM

10GB text = ~15-20GB heap (UTF-16 overhead của String trong JVM + object header). Heap default vài GB → crash ngay.

Chuyển sang BufferedReader loop while (readLine) — chạy được nhưng imperative:

try (BufferedReader r = Files.newBufferedReader(path)) {
    long errorCount = 0;
    String line;
    while ((line = r.readLine()) != null) {
        if (line.contains("ERROR")) errorCount++;
    }
    System.out.println(errorCount);
}

6 dòng cho task đơn giản. Intent "đếm ERROR" không rõ từ code — phải skim qua loop structure.

Files.lines + Stream API kết hợp tốt nhất của cả hai: memory constant (không OOM), code declarative (đúng business intent):

try (Stream<String> lines = Files.lines(path)) {
    long errorCount = lines.filter(l -> l.contains("ERROR")).count();
    System.out.println(errorCount);
}

3 dòng. Đọc là hiểu. Memory ~KB buffer, scale với file bất kỳ size.

Bài này giải thích 3 kỹ thuật file I/O cho dữ liệu lớn:

  1. Files.lines + Stream API — 90% use case text processing.
  2. Memory-mapped file (MappedByteBuffer) — random access binary lớn, không tốn heap.
  3. FileChannel + ByteBuffer — NIO classic, control chi tiết, dùng cho copy zero-copy và pattern đặc biệt.

1. Files.lines + Stream API — pattern chuẩn cho text

Cách hoạt động

try (Stream<String> lines = Files.lines(Path.of("app.log"), StandardCharsets.UTF_8)) {
    long errorCount = lines
        .filter(l -> l.contains("ERROR"))
        .count();
}

Cơ chế bên dưới:

  1. Files.lines mở file qua BufferedReader internal.
  2. Wrap BufferedReader thành Stream<String> qua Spliterator.
  3. Mỗi element stream được tạo khi pipeline yêu cầu — gọi readLine() lấy dòng kế.
  4. Stream lazy (bài 9.2) — chỉ line đang xử lý ở heap.

Memory: ~KB cho buffer + 1 line Java object tại 1 thời điểm. File 10GB, 100GB, 1TB đều chạy được miễn có disk space.

Processing time linear với file size — I/O bound (đọc disk) không CPU bound.

Bắt buộc try-with-resources

// BAD
Files.lines(path).filter(...).count();   // Leak file descriptor

Stream<String> implement AutoCloseable. Khi scope kết thúc, JVM call .close() → close underlying BufferedReader → release FD.

Nếu không try-with-resources, FD giữ cho đến khi GC dọn stream object — timing không xác định, có thể lâu. Service 24/7 chạy leak → "Too many open files".

Javadoc của Files.lines ghi rõ: "The returned stream contains a reference to an open file. The file is closed by closing the stream."

Pipeline thực tế — phân tích log

Task: file log 1GB format [TIMESTAMP] LEVEL [REQUEST_ID] message. Tìm top 10 request ID có nhiều ERROR nhất.

record LogLine(Instant time, String level, String requestId, String message) {
    static final Pattern PATTERN = Pattern.compile(
        "\\[([^\\]]+)\\] (\\w+) \\[([^\\]]+)\\] (.*)"
    );

    static LogLine parse(String line) {
        Matcher m = PATTERN.matcher(line);
        if (!m.matches()) return null;
        try {
            return new LogLine(
                Instant.parse(m.group(1)),
                m.group(2),
                m.group(3),
                m.group(4)
            );
        } catch (DateTimeException e) {
            return null;
        }
    }
}

Map<String, Long> topErrors;
try (Stream<String> lines = Files.lines(Path.of("app.log"), StandardCharsets.UTF_8)) {
    topErrors = lines
        .map(LogLine::parse)
        .filter(Objects::nonNull)                         // Skip line khong match
        .filter(l -> "ERROR".equals(l.level()))
        .collect(Collectors.groupingBy(
            LogLine::requestId,
            Collectors.counting()))
        .entrySet().stream()
        .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
        .limit(10)
        .collect(Collectors.toMap(
            Map.Entry::getKey, Map.Entry::getValue,
            (a, b) -> a, LinkedHashMap::new));
}

Memory: ~few MB (JVM heap cho map counts + line buffer). File 1GB → OK. 100GB → OK. Chỉ phụ thuộc số unique requestId có ERROR — aggregated data, không toàn bộ file.

Imperative tương đương: 2 for loop lồng + HashMap thủ công, ~30 dòng. Stream version 10 dòng declarative.

2. Memory-mapped file — MappedByteBuffer

Concept

Memory-mapped file: OS map vùng địa chỉ virtual memory của process tới content file. Đọc/ghi vùng đó = đọc/ghi file, nhưng không copy vào JVM heap — data nằm trong OS page cache.

try (FileChannel ch = FileChannel.open(
        Path.of("big.dat"), StandardOpenOption.READ)) {
    long size = ch.size();
    MappedByteBuffer buf = ch.map(
        FileChannel.MapMode.READ_ONLY, 0, size);

    byte[] first100 = new byte[100];
    buf.get(first100);

    buf.position(10_000_000);   // Seek
    byte b = buf.get();
}

Ưu điểm

1. Random access cực nhanh File nằm trong OS page cache. Đọc position bất kỳ = memory access thường, không syscall. Với access pattern "đọc pos 1M, rồi pos 500M, rồi pos 100K" — mỗi đọc ~100ns (memory read), không ~3μs (syscall read).

Ngược lại, FileChannel.read(buf, position) mỗi lần là 1 syscall seek + read.

2. File > JVM heap OK Map 10GB file với heap 1GB OK. Buffer không nằm trong JVM heap — nó là virtual memory reference tới page cache OS.

3. Share giữa process Nhiều process map cùng file → cùng page cache → share data không copy. Advanced use case (IPC, shared cache).

Nhược điểm

1. File > 2GB cần map nhiều phần Java MappedByteBuffer dùng int index — limit 2GB (Integer.MAX_VALUE). File lớn hơn phải chia map nhiều region:

long fileSize = ch.size();
long chunkSize = Integer.MAX_VALUE;
List<MappedByteBuffer> chunks = new ArrayList<>();
for (long offset = 0; offset < fileSize; offset += chunkSize) {
    long size = Math.min(chunkSize, fileSize - offset);
    chunks.add(ch.map(FileChannel.MapMode.READ_ONLY, offset, size));
}

Java 21+ có API MemorySegment (Foreign Function & Memory API) không có limit 2GB — alternative cho use case lớn.

2. Unmap không control được MappedByteBuffer không có method unmap() chuẩn — buffer giữ page cache đến khi GC dọn. File đang map có thể không rename/delete được trên Windows.

Workaround (not standard): sun.nio.ch.FileChannelImpl.unmap(buffer) — private API, fragile.

3. Write durability không đảm bảo Write vào MappedByteBuffer đi vào OS page cache, chưa chắc xuống disk. buf.force() gọi fsync — đảm bảo disk.

Use case

  • Database index / B-tree: random seek, read-heavy — OS cache tự optimize.
  • Binary format parsing: header + pointer offset (ELF, DEX, serialization format).
  • Full-text search index: Lucene dùng memory-mapped cho segment file.
  • Shared memory IPC: 2 process map cùng file.

Không đáng cho sequential scan 1 lần — BufferedInputStream đơn giản hơn, perf tương đương.

3. FileChannel + ByteBuffer — NIO channel API

NIO thế hệ 1 (Java 1.4, 2002): introduces Channel + Buffer + Selector. Thiết kế cho non-blocking I/O.

Pattern cơ bản

try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
    ByteBuffer buf = ByteBuffer.allocate(4096);
    while (ch.read(buf) != -1) {
        buf.flip();                     // Switch to read mode
        while (buf.hasRemaining()) {
            byte b = buf.get();
            process(b);
        }
        buf.clear();                    // Reset to write mode
    }
}

Buffer lifecycle

ByteBuffer có 3 pointer: position, limit, capacity.

Empty buffer after allocate(1024):
position=0, limit=1024, capacity=1024
|=====================|
0                    1024

After writing 100 bytes:
position=100, limit=1024, capacity=1024
|....|================|
0   100             1024

After flip():
position=0, limit=100, capacity=1024
|====|                |
0    100            1024
(mode read: data tu 0 den 100)

After reading 50 bytes:
position=50, limit=100, capacity=1024
     |====|            |
0    50  100         1024

After clear():
position=0, limit=1024, capacity=1024
|=====================|
(sua back mode write)

Method lifecycle:

  • allocate(n) → write mode.
  • flip() → switch sang read mode (limit = position, position = 0).
  • clear() → back về write mode (position = 0, limit = capacity).
  • compact() → keep unread data, ready cho write tiếp.

Verbose. Dễ bug nếu quên flip() sau write hoặc clear() giữa iterations.

Khi nào dùng FileChannel thay BufferedInputStream?

BufferedInputStream đơn giản hơn cho sequential read. FileChannel có ưu thế cho:

1. Random access với position:

ByteBuffer buf = ByteBuffer.allocate(1024);
ch.read(buf, 10_000_000);   // Doc tu offset, khong seek thu cong

2. transferTo — zero-copy:

try (FileChannel src = FileChannel.open(source, StandardOpenOption.READ);
     FileChannel dst = FileChannel.open(dest, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
    src.transferTo(0, src.size(), dst);
}

transferTo dùng sendfile syscall (Linux) — OS copy data trực tiếp từ disk page cache sang disk page cache khác, không qua user space. Nhanh hơn read-write loop ~2-3×.

Dùng cho: file server, backup, streaming large file (HTTP response body).

3. Lock vùng file:

FileLock lock = ch.lock(offset, length, shared);
try {
    // Exclusive access to this region
} finally {
    lock.release();
}

Cross-process file locking — 2 process dùng cùng file.

4. Non-blocking I/O (với selector): Pattern cho server socket NIO — không áp dụng file thông thường.

4. Khi nào chọn API nào

flowchart TD
    A[Xu ly file] --> B{Text hay binary?}
    B -->|Text| C{Size?}
    C -->|Nho, biet| D[Files.readString]
    C -->|Lon/Stream| E[Files.lines + Stream]
    B -->|Binary| F{Pattern?}
    F -->|Sequential read| G[BufferedInputStream]
    F -->|Random access<br/>File lon| H[MappedByteBuffer]
    F -->|Copy / Transfer| I[FileChannel.transferTo]
    F -->|Non-blocking socket| J[AsynchronousFileChannel]

Rule thực tế:

  1. Default: Files.lines cho text, Files.newInputStream cho binary sequential.
  2. Large sequential text: Files.lines + Stream API (bài này).
  3. Random access binary: MappedByteBuffer.
  4. High-perf copy: FileChannel.transferTo.
  5. Async I/O: AsynchronousFileChannel — callback/Future based. Java 21 virtual thread làm pattern này ít cần thiết hơn (bài 10.5).

5. Pattern thực tế — word count file lớn

Task: đọc file 10GB text, đếm tần suất mỗi từ, in top 20.

try (Stream<String> lines = Files.lines(Path.of("huge.txt"), StandardCharsets.UTF_8)) {
    Map<String, Long> wordCount = lines
        .flatMap(l -> Arrays.stream(l.split("\\s+")))
        .filter(w -> !w.isEmpty())
        .map(String::toLowerCase)
        .collect(Collectors.groupingBy(
            Function.identity(),
            Collectors.counting()));

    wordCount.entrySet().stream()
        .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
        .limit(20)
        .forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));
}

Memory: phụ thuộc số distinct word (thường dưới 1 triệu cho text English). ~100MB heap cho map. File size không giới hạn.

Imperative: 2 loop + HashMap thủ công, ~25 dòng. Stream 10 dòng — đúng business intent.

6. AsynchronousFileChannel — async I/O

AsynchronousFileChannel ch = AsynchronousFileChannel.open(
    path, StandardOpenOption.READ);

ByteBuffer buf = ByteBuffer.allocate(4096);
Future<Integer> future = ch.read(buf, 0);

// Lam viec khac
doOtherStuff();

int n = future.get();   // Block khi can ket qua

Hoặc callback:

ch.read(buf, 0, null, new CompletionHandler<Integer, Void>() {
    @Override public void completed(Integer n, Void attachment) {
        System.out.println("Read " + n + " bytes");
    }
    @Override public void failed(Throwable ex, Void attachment) {
        ex.printStackTrace();
    }
});

Thiết kế cho high-concurrency server pre-virtual-thread. Java 21+ có virtual thread (bài 10.5) — pattern blocking API trên virtual thread thường dễ đọc hơn async callback.

Use case còn lại của AsynchronousFileChannel:

  • Code base cũ đã dùng.
  • Legacy library expect CompletionHandler interface.
  • Mix với reactive stream (Reactor, RxJava).

Code mới đơn giản: blocking API trên virtual thread.

7. Pitfall tổng hợp

Nhầm 1: Files.lines không close.

Files.lines(path).forEach(...);   // Leak FD

✅ Try-with-resources.

Nhầm 2: Files.readAllLines cho file lớn.

List<String> all = Files.readAllLines(Path.of("10gb.log"));   // OOM

Files.lines + stream, memory constant.

Nhầm 3: Quên flip() sau write buffer.

buf.put(data);
byte b = buf.get();   // Doc tu position sau write, get data khong dinh

flip() trước read:

buf.put(data);
buf.flip();
byte b = buf.get();

Nhầm 4: MappedByteBuffer cho file nhỏ.

ch.map(MapMode.READ_ONLY, 0, 1024);   // Overhead > loi ich

✅ File nhỏ: Files.readAllBytes hoặc ByteBuffer.wrap(readAllBytes).

Nhầm 5: Không force() khi cần durability.

buf.put(data);
// JVM crash -> data in OS cache, chua xuong disk

buf.force() với MappedByteBuffer; ch.force(true) với FileChannel.

Nhầm 6: ByteBuffer.allocate lặp trong loop.

while (reading) {
    ByteBuffer buf = ByteBuffer.allocate(8192);   // Allocate moi iteration
    ch.read(buf);
    // ...
}

✅ Allocate 1 lần ngoài loop, clear() reuse:

ByteBuffer buf = ByteBuffer.allocate(8192);
while (reading) {
    buf.clear();
    ch.read(buf);
    // ...
}

8. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Oracle Java Magazine có series về NIO performance. Benchmark thực cho memory-mapped vs channel read: mapped nhanh hơn 2-3× với random access, tương đương với sequential scan. Nếu optimize file-heavy workload, đọc series này để tránh premature optimization. Với Java 21+, cân nhắc MemorySegment — API mới thay thế MappedByteBuffer cho file lớn.

9. Tóm tắt

  • Files.lines(path) + Stream API = declarative xử lý file text lớn, memory constant. 90% use case text processing.
  • Luôn close Files.lines qua try-with-resources — stream giữ file handle.
  • MappedByteBuffer map file vào address space process — random access nhanh, không tốn JVM heap.
  • MappedByteBuffer limit 2GB (int index) — file lớn hơn chia multiple region hoặc dùng MemorySegment (Java 21+).
  • FileChannel + ByteBuffer — NIO channel API, verbose lifecycle (allocate → write → flip → read → clear).
  • FileChannel.transferTo zero-copy ở kernel level (sendfile) — tối ưu copy file, streaming response.
  • AsynchronousFileChannel cho async I/O pre-virtual-thread; Java 21+ virtual thread giảm nhu cầu.
  • Rule chọn API: default Files.lines/newInputStream sequential; random access → mapped; copy → transferTo; non-blocking → async.
  • Pattern count/filter/group trên log file → stream + collectors — replace 2-3× code imperative.

10. Tự kiểm tra

Tự kiểm tra
Q1
Vì sao Files.lines scale được với file 10GB mà readAllLines thì không?

readAllLines load toàn bộ vào List<String> trong JVM heap. 10GB text UTF-8 → ~15-20GB heap vì:

  • String trong JVM dùng UTF-16 (2 byte/char cho BMP, 4 byte cho supplementary) — overhead gấp đôi so với UTF-8 disk.
  • Mỗi String object có header (16 byte) + length field + char array header.

Heap default vài GB → OOM ngay.

Files.lines trả lazy stream — mỗi lần pipeline cần element, đọc 1 line từ BufferedReader internal. Chỉ 1 line + buffer 8KB ở heap tại 1 thời điểm. Memory constant bất kể file size.

Processing time linear với file size — I/O bound (đọc disk), không RAM bound.

Rule: file size tiên đoán được + nhỏ (< 100MB) → readAllLines tiện. Không biết size hoặc có thể lớn → stream-based.

Q2
Khi nào dùng MappedByteBuffer thay BufferedInputStream?

Dùng MappedByteBuffer khi:

  • Random access binary: database index (B-tree), binary format cần seek nhiều lần (ELF, DEX, Lucene segment). OS page cache tự optimize — "hot region" của file cache trong RAM, access sau nhanh.
  • File > JVM heap: map tránh load vào heap. Memory nằm trong OS page cache, không trong heap.
  • Share data cross-process: nhiều process map cùng file → cùng page cache → share data không copy. Advanced IPC use case.

Không đáng với sequential scan 1 lần — BufferedInputStream đơn giản hơn, perf tương đương (cả hai đều đọc tuần tự qua page cache).

Benchmark thực: mapped thắng 2-3× với random access, tương đương hoặc hơi chậm với sequential (overhead setup mmap). Đo cụ thể với workload production trước khi commit vào mapped.

Q3
Cơ chế buffer.flip() trong NIO là gì, và vì sao cần thiết?

NIO ByteBuffer có 3 pointer: position, limit, capacity. Buffer có 2 mode:

  • Write mode: position là chỗ ghi tiếp, limit = capacity. "Còn bao nhiêu chỗ trống".
  • Read mode: position là chỗ đọc tiếp, limit là chỗ cuối data hợp lệ. "Còn bao nhiêu data chưa đọc".

Lifecycle:

  1. allocate(n) → mode write, position=0, limit=n.
  2. Write 100 byte → position=100, limit=n.
  3. flip() → switch mode read, limit=100 (lưu giá trị position cũ), position=0.
  4. Read data từ 0 đến 100.
  5. clear() → back mode write cho iteration kế.

Quên flip sau write → read từ position đang ở cuối (sau data) → không data / data invalid. Common bug người mới NIO.

Phát hiện: chạy buffer nhưng không data ra → check có flip sau write không.

Q4
Khi nào cần force/flush khi ghi file, và 2 tầng buffer là gì?

Data ghi đi qua 2 tầng buffer:

  1. JVM buffer → OS: BufferedWriter.flush(), OutputStream.flush(), MappedByteBuffer.force() push data xuống OS page cache. Nhanh.
  2. OS cache → disk: fos.getFD().sync(), FileChannel.force(true) gọi fsync syscall — buộc OS ghi page cache xuống disk thực. Chậm ~5-50ms/call.

Tầng 1 đủ khi: mất data trong buffer có thể chấp nhận (vd log debug). JVM crash → mất buffer JVM, nhưng data đã flush xuống OS vẫn persist khi OS không crash.

Tầng 2 cần khi: power-off mất data không chấp nhận được. Transaction log DB (WAL), financial audit, medical record. Mỗi write critical cần fsync.

Cost fsync: chậm. Không gọi mỗi write — batch và fsync cuối batch, hoặc fsync theo interval (1s/lần).

Compromise: SQLite mặc định fsync mỗi commit (durable), có option PRAGMA synchronous=OFF để bỏ (nhanh hơn nhưng mất durability).

Q5
Đoạn sau có vấn đề gì? Stream<String> s = Files.lines(path); s.filter(...).count();

Stream không close — file handle leak.

Intermediate op filter và terminal count không tự close stream. Stream implement AutoCloseable và javadoc Files.lines nói rõ phải close để release underlying resource (BufferedReader + FileChannel).

Fix:

try (Stream<String> s = Files.lines(path)) {
  long n = s.filter(...).count();
}

Hiệu ứng leak: service 24/7 chạy gọi hàm này hàng nghìn lần/giờ → FD leak tăng dần. Linux default limit 1024 FD/process → "Too many open files" sau vài ngày. macOS limit thấp hơn → fail sớm hơn.

Đây là một trong những leak phổ biến nhất trong code Java dùng Stream API — dễ quên vì intermediate/terminal op trên stream không gợi ý "cần close". Rule: stream có underlying resource (file, socket, DB) → luôn try-with-resources.

Q6
Vì sao FileChannel.transferTo nhanh hơn viết loop read/write?

Loop read/write truyền thống:

byte[] buf = new byte[8192];
int n;
while ((n = in.read(buf)) != -1) {
  out.write(buf, 0, n);
}

Mỗi byte đi qua:

  1. Disk → OS page cache (read).
  2. OS page cache → JVM buffer (syscall copy).
  3. JVM buffer → OS page cache (syscall copy).
  4. OS page cache → disk (write).

2 lần copy user-kernel space. Với file 1GB = 2GB data copy, ~100k syscall.

FileChannel.transferTo dùng sendfile syscall (Linux) hoặc tương đương OS khác:

src.transferTo(0, src.size(), dst);

OS copy trực tiếp page cache → page cache trong kernel, không qua user space. Zero-copy. 1 syscall thay 100k.

Tốc độ: thường nhanh hơn 2-3×, CPU usage giảm đáng kể.

Use case: HTTP file server (Apache, nginx dùng sendfile), backup tool, streaming large response body. Spring Framework StreamingResponseBody dùng transferTo khi có thể.

Bài tiếp theo: Mini-challenge: Log aggregator với NIO.2 và Stream

Bài này có giúp bạn hiểu bản chất không?