Thuật toán & Cấu trúc dữ liệu — Thực chiến/Mini-challenge — External merge sort: sort 10GB file với 1GB RAM
~35 phútSắp xếp & thứ tựMiễn phí lượt xem

Mini-challenge — External merge sort: sort 10GB file với 1GB RAM

Implement ExternalSorter: chia file thành sorted chunk, k-way merge bằng min-heap — pattern Postgres ORDER BY, Lucene segment merge, Hadoop shuffle đều dùng khi data vượt RAM.

Production gặp OOM: bạn nhận task sort một file text 10GB chứa 1 integer mỗi dòng. Máy chỉ có 1GB RAM. Gọi Arrays.sort trực tiếp sau khi load toàn bộ file? OOM tức thì.

Cùng vấn đề xảy ra trong database: ORDER BY trên result set vượt work_mem trong PostgreSQL sẽ spill xuống disk thay vì crash — nhưng logic bên dưới là y chang: chia dữ liệu thành chunk vừa RAM, sort từng chunk, rồi k-way merge.

Bài này luyện pattern external merge sort: split → sort chunks → k-way merge. Sau khi xong, bạn implement được sort algorithm Postgres, Lucene, và Hadoop dùng.

🎯 Yêu cầu bài toán

Viết class ExternalSorter với API sau:

  • void sort(Path inputFile, Path outputFile, int chunkSize) — sort toàn bộ file thành output sorted ascending.
  • Constraint: heap memory chỉ giữ tối đa chunkSize integer cùng lúc (Phase 1) + buffer nhỏ cho merge (Phase 2, tối đa k integer với k = số chunk).
  • Output: sorted ascending, mỗi integer một dòng.

Test scenario

Generate input: 1 triệu integer, 1 per line, file khoảng 10MB.

Sort với chunkSize = 100_000 (~100 nghìn integer mỗi chunk, khoảng 1MB).

Verify: output sorted, count giống input, sum giống input.

📦 Starter code

import java.io.*;
import java.nio.file.*;
import java.util.*;

public class ExternalSorter {

    public void sort(Path input, Path output, int chunkSize) throws IOException {
        // Phase 1: split input into sorted chunks (each fits in chunkSize)
        List<Path> chunks = splitAndSort(input, chunkSize);

        // Phase 2: k-way merge chunks into output
        kWayMerge(chunks, output);

        // Cleanup temp files
        for (Path c : chunks) Files.delete(c);
    }

    private List<Path> splitAndSort(Path input, int chunkSize) throws IOException {
        // TODO
        return List.of();
    }

    private void kWayMerge(List<Path> chunks, Path output) throws IOException {
        // TODO
    }
}

Dành 25–30 phút tự làm trước khi xem gợi ý.

🧪 Test harness

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;

public class ExternalSorterTest {

    public static void main(String[] args) throws IOException {
        // Generate test input: 1M integers, 1 per line
        Path input  = Files.createTempFile("sort-input-",  ".txt");
        Path output = Files.createTempFile("sort-output-", ".txt");

        Random rng = new Random(42);
        long expectedSum = 0;
        try (BufferedWriter w = Files.newBufferedWriter(input)) {
            for (int i = 0; i < 1_000_000; i++) {
                int v = rng.nextInt(10_000_000);
                expectedSum += v;
                w.write(Integer.toString(v));
                w.newLine();
            }
        }

        long start = System.nanoTime();
        new ExternalSorter().sort(input, output, 100_000);
        long ms = (System.nanoTime() - start) / 1_000_000;
        System.out.printf("Sort done in %d ms%n", ms);

        // Verify: sorted + same count + same sum
        long count = 0, actualSum = 0, prev = Long.MIN_VALUE;
        try (BufferedReader r = Files.newBufferedReader(output)) {
            String line;
            while ((line = r.readLine()) != null) {
                long v = Long.parseLong(line.trim());
                if (v < prev) throw new AssertionError("NOT SORTED at line " + count);
                prev = v;
                actualSum += v;
                count++;
            }
        }
        assert count == 1_000_000 : "Count mismatch: " + count;
        assert actualSum == expectedSum : "Sum mismatch";
        System.out.println("PASS: sorted, count=1000000, sum verified");

        Files.deleteIfExists(input);
        Files.deleteIfExists(output);
    }
}

💡 Gợi ý

💡 Stage 1 — Phase 1: split và sort từng chunk (đọc khi bắt đầu bí)

Đọc input line by line với BufferedReader, tích lũy vào int[chunkSize].

Khi mảng đầy (idx == chunkSize): gọi Arrays.sort(), ghi ra file tạm chunk_N.tmp, reset idx = 0.

Edge case: sau khi đọc hết file, nếu idx > 0 còn chunk chưa đầy — cũng cần sort và ghi ra file tạm.

// Skeleton hint
private List<Path> splitAndSort(Path input, int chunkSize) throws IOException {
    List<Path> chunks = new ArrayList<>();
    int[] buffer = new int[chunkSize];
    int idx = 0;
    try (BufferedReader reader = Files.newBufferedReader(input)) {
        String line;
        while ((line = reader.readLine()) != null) {
            buffer[idx++] = Integer.parseInt(line.trim());
            if (idx == chunkSize) {
                chunks.add(writeSortedChunk(buffer, idx));
                idx = 0;
            }
        }
        if (idx > 0) chunks.add(writeSortedChunk(buffer, idx));
    }
    return chunks;
}

writeSortedChunk nhận bufferlen, Arrays.copyOf đúng len, sort, ghi từng số ra dòng.

Kết quả: list các file tạm, mỗi file đã sorted ascending.

💡 Stage 2 — Phase 2: k-way merge với min-heap (đọc khi đã xong Phase 1)

Mở tất cả N file tạm (N BufferedReader).

Đọc số đầu tiên từ mỗi file, push vào PriorityQueue dạng (value, fileIndex).

Loop chính:

  1. poll() min entry khỏi heap.
  2. Write value ra output.
  3. Đọc dòng tiếp theo từ file có index fileIndex.
  4. Nếu còn dòng: push (nextValue, fileIndex) vào heap.
  5. Lặp đến khi heap rỗng.

Insight quan trọng: heap chỉ giữ tối đa N integer (N = số chunk) — không phải toàn bộ data. Memory bounded bởi số file, không phải kích thước file.

// record để giữ trong heap
private record HeapEntry(int value, int fileIndex) {}

Dùng Comparator.comparingInt(HeapEntry::value) khi tạo PriorityQueue.

💡 Stage 3 — Edge cases cần xử lý (đọc khi code chạy nhưng có lỗi lạ)

BufferedWriter flush: try-with-resources tự close()flush() — dùng pattern này để tránh thiếu data ở cuối file.

File handle limit OS: N file mở đồng thời. Với 1M integers và chunkSize 100k thì N = 10 — không vấn đề. Nhưng nếu file lớn hơn hoặc chunkSize nhỏ hơn, N có thể vượt nghìn. Linux default ulimit -n = 1024. Fix: multi-pass merge — merge 500 file thành 1, rồi merge cấp cao.

Charset: Files.newBufferedReader(path) mặc định UTF-8 — nhất quán giữa write và read là đủ. Chỉ cần chú ý khi file input đến từ nguồn bên ngoài (Windows-1252, UTF-16).

✅ Lời giải

✅ Lời giải — xem sau khi đã tự làm
import java.io.*;
import java.nio.file.*;
import java.util.*;

public class ExternalSorter {

    public void sort(Path input, Path output, int chunkSize) throws IOException {
        List<Path> chunks = splitAndSort(input, chunkSize);
        kWayMerge(chunks, output);
        for (Path c : chunks) Files.deleteIfExists(c);
    }

    // Phase 1: read input, split into chunkSize chunks, sort each, write to temp files.
    private List<Path> splitAndSort(Path input, int chunkSize) throws IOException {
        List<Path> chunks = new ArrayList<>();
        int[] buffer = new int[chunkSize];
        int idx = 0;

        try (BufferedReader reader = Files.newBufferedReader(input)) {
            String line;
            while ((line = reader.readLine()) != null) {
                buffer[idx++] = Integer.parseInt(line.trim());
                if (idx == chunkSize) {
                    chunks.add(writeSortedChunk(buffer, idx));
                    idx = 0;
                }
            }
            // Handle last partial chunk
            if (idx > 0) chunks.add(writeSortedChunk(buffer, idx));
        }
        return chunks;
    }

    // Sort buffer[0..len-1] and write to a temp file. Return the temp file path.
    private Path writeSortedChunk(int[] buffer, int len) throws IOException {
        int[] sorted = Arrays.copyOf(buffer, len);
        Arrays.sort(sorted);
        Path chunk = Files.createTempFile("ext-sort-", ".tmp");
        try (BufferedWriter w = Files.newBufferedWriter(chunk)) {
            for (int v : sorted) {
                w.write(Integer.toString(v));
                w.newLine();
            }
        }
        return chunk;
    }

    // Entry in the min-heap: (value, fileIndex)
    private record HeapEntry(int value, int fileIndex) {}

    // Phase 2: open all chunk files simultaneously, k-way merge into output.
    private void kWayMerge(List<Path> chunks, Path output) throws IOException {
        BufferedReader[] readers = new BufferedReader[chunks.size()];
        PriorityQueue<HeapEntry> heap = new PriorityQueue<>(
            Comparator.comparingInt(HeapEntry::value)
        );

        try {
            // Seed heap with first integer from each chunk file
            for (int i = 0; i < chunks.size(); i++) {
                readers[i] = Files.newBufferedReader(chunks.get(i));
                String line = readers[i].readLine();
                if (line != null) {
                    heap.offer(new HeapEntry(Integer.parseInt(line.trim()), i));
                }
            }

            // Merge loop: always write the global minimum
            try (BufferedWriter w = Files.newBufferedWriter(output)) {
                while (!heap.isEmpty()) {
                    HeapEntry e = heap.poll();
                    w.write(Integer.toString(e.value()));
                    w.newLine();
                    // Advance the reader that just contributed the minimum
                    String next = readers[e.fileIndex()].readLine();
                    if (next != null) {
                        heap.offer(new HeapEntry(Integer.parseInt(next.trim()), e.fileIndex()));
                    }
                }
            }
        } finally {
            // Always close all readers even if exception thrown
            for (BufferedReader r : readers) {
                if (r != null) r.close();
            }
        }
    }
}

Key insight — tại sao memory bounded:

Phase 1: heap chỉ giữ int[chunkSize] — ví dụ 100k integer = khoảng 400KB.

Phase 2: heap giữ tối đa N integer (N = số chunk). Với 1M integers và chunkSize 100k, N = 10. PriorityQueue chỉ có 10 phần tử bất kể file lớn đến đâu.

Time complexity: O(N × log K) với N là tổng số records, K là số chunk. Phase 1 O(N) đọc + O(chunk_size × log chunk_size) sort × số_chunk = O(N log N). Phase 2 O(N log K).

⚠️ Pitfall

Pitfall 1 — File handle leak: quên close readers trong finally.

// WRONG: exception in kWayMerge body skips close() calls
private void kWayMerge(List<Path> chunks, Path output) throws IOException {
    BufferedReader[] readers = new BufferedReader[chunks.size()];
    for (int i = 0; i < chunks.size(); i++) {
        readers[i] = Files.newBufferedReader(chunks.get(i)); // open files
    }
    // ... merge logic that may throw ...
    for (BufferedReader r : readers) r.close(); // NEVER REACHED on exception
}

Với N lớn (nghìn chunk), mỗi BufferedReader mở 1 file descriptor. Khi exception xảy ra và close không được gọi, file descriptor leak tích lũy → "Too many open files" OS error. Fix: wrap trong try { ... } finally { close all readers } hoặc dùng try-with-resources.

Pitfall 2 — Charset encoding: giả định sai encoding của input file.

// WRONG: may fail silently if input is UTF-16 or Windows-1252
Files.newBufferedReader(path) // defaults to UTF-8

// CORRECT: explicit charset if input source is known
Files.newBufferedReader(path, StandardCharsets.UTF_8)
// or read bytes and detect BOM for unknown sources

File từ Windows editors có thể UTF-16 với BOM, hoặc Windows-1252. Integer.parseInt ném NumberFormatException với ký tự lạ thay vì lỗi encoding rõ ràng — khó debug. Validate charset tường minh từ đầu.

Pitfall 3 — Quên flush BufferedWriter: data mất ở cuối file.

// RISKY: manual close may forget flush
BufferedWriter w = Files.newBufferedWriter(output);
// ... write ...
w.close(); // close() calls flush() -- OK if reached

// SAFER: try-with-resources guarantees flush + close even on exception
try (BufferedWriter w = Files.newBufferedWriter(output)) {
    // ... write ...
} // auto flush + close

close() gọi flush() trong Java standard library — nhưng nếu bạn close thủ công và có exception xảy ra trước đó, close() bị bỏ qua. try-with-resources đảm bảo close luôn được gọi kể cả khi exception.

📊 Benchmark tham khảo

Sort 10GB file (1 ty integer), chunkSize 100M:
  Naive Arrays.sort: OOM immediately
  External merge (100 chunks x 100M each):
    Phase 1 split+sort: ~5 min  (CPU-bound: sort 100M x 100 times)
    Phase 2 k-way merge: ~3 min (IO-bound: read+write 10GB sequentially)
    Total: ~8 min
  With SSD vs HDD:
    HDD sequential write/read: ~150 MB/s -> IO-bound
    SSD sequential: ~500 MB/s  -> 2-3x faster Phase 2

Bottleneck Phase 1 là CPU (sort). Bottleneck Phase 2 là IO bandwidth — không phải CPU, không phải memory. Tăng RAM để tăng chunkSize → ít chunk hơn → Phase 2 ít file hơn → nhanh hơn chút. Tăng số thread sort chunk song song → Phase 1 nhanh hơn đáng kể.

🎓 Variant để tự thử

Variant 1 — Thay Arrays.sort trong Phase 1 bằng Counting sort: Nếu biết range (ví dụ 0 đến 9_999_999), Counting sort O(n + range) thay O(n log n). Với n = 100k và range = 10M, trade-off không rõ ràng — thử đo benchmark thực tế.

Variant 2 — Multi-pass merge khi N vượt 1000: Nếu chunkSize nhỏ và file lớn, N chunk có thể vượt ulimit -n. Fix: merge tối đa 500 file thành 1 trong mỗi pass, lặp lại đến khi còn 1 file. Đây là cách sort trên Unix xử lý file khổng lồ.

Variant 3 — Compression chunk file: Wrap BufferedWriter bên trong GZIPOutputStream khi ghi chunk. Trade IO bandwidth lấy CPU: chunk file nhỏ 3–5x, Phase 2 đọc ít bytes hơn. Hiệu quả khi disk IO là bottleneck (HDD).

Variant 4 — Parallel split phase: Phase 1 sort từng chunk độc lập nhau — hoàn toàn parallelizable. Dùng ExecutorService với N threads sort N chunk song song. Phase 2 vẫn sequential (single pass merge). Speedup Phase 1 gần tuyến tính với số core.

🔗 LeetCode liên quan

🏭 Applied note

PostgreSQL ORDER BY với work_mem: khi sort vượt work_mem (mặc định 4MB), Postgres dùng tuplesort — chia thành sorted run trên disk, k-way merge cuối. Cùng pattern bài này, ở production scale.

Hadoop MapReduce shuffle phase: sau map, records sort theo key trước khi gửi đến reducer. Mỗi mapper sort partition của mình (Phase 1), reducer fetch và k-way merge các sorted partition (Phase 2) — distributed external sort.

Apache Lucene segment merge: khi index flush memory buffer ra disk, tạo segment (sorted term list). Background merger dùng k-way merge ghép các segment nhỏ thành segment lớn — cùng cấu trúc external merge sort.

Unix sort -S 1G huge_file.txt: flag -S chỉ định buffer size, tương đương chunkSize. Khi buffer đầy, sort ghi chunk ra temp file, sau đó k-way merge. GNU sort còn hỗ trợ --parallel cho Phase 1.

Cross-link: Module 4 lesson 03 (Merge sort — k-way merge concept), Module 8 (Big data & streaming — external algorithms ở distributed scale).

✨ Điều bạn vừa làm được

  • Implement external merge sort hai phase: split+sort chunk trong RAM, k-way merge bằng min-heap.
  • Hiểu tại sao PriorityQueue trong Phase 2 chỉ giữ K integer (K = số chunk) — memory bounded bất kể kích thước file.
  • Biết ba pitfall: file handle leak khi không dùng finally, charset mismatch, quên flush BufferedWriter.
  • Nhận ra pattern này là nền tảng của PostgreSQL tuplesort, Hadoop shuffle, Lucene segment merge, và Unix sort.
  • Thấy hướng optimize thực tế: tăng chunkSize (ít chunk, ít merge pass), parallel Phase 1, compress chunk, multi-pass khi N chunk lớn.

Bài tiếp theo: Case Study: TimSort + Redis ZSET

Bài này có giúp bạn hiểu bản chất không?