Thuật toán Ứng dụng — DP, String, Big Data & hơn nữa/External merge sort — sắp xếp vượt RAM
21/66
Bài 21 / 66~20 phútBig data & streaming — Khi RAM không đủMiễn phí lượt xem

External merge sort — sắp xếp vượt RAM

Chia dữ liệu thành chunk vừa RAM, sort từng chunk, rồi k-way merge. Nền của ORDER BY trên database và shuffle MapReduce.

TL;DR: Khi dữ liệu vượt RAM (100 GB với 8 GB RAM), không thể sort in-memory. External merge sort chia thành hai phase: Phase 1 đọc từng chunk vừa RAM, sort in-memory, ghi ra "run" trên disk; Phase 2 dùng min-heap k-way merge gộp tất cả run thành kết quả sắp xếp mà chỉ giữ k phần tử đầu mỗi run trong RAM. Số pass đọc/ghi I/O = ceil(log_k(N/M)) — tăng k giảm số pass. Đây là nền của ORDER BY trong Postgres, shuffle phase của MapReduce, và external dedup ở mọi hệ pipeline lớn.

Bạn vừa nhận task: sort log file 100 GB từ server production. Máy chỉ có 8 GB RAM. Gọi Arrays.sort() hay sort() built-in? Chương trình crash ngay với OutOfMemoryError — vì toàn bộ dữ liệu không vừa RAM. Đây là vấn đề external sort: sort tập dữ liệu lớn hơn bộ nhớ khả dụng, phải tận dụng disk làm bộ nhớ phụ.

External sort không phải niche: mỗi lần Postgres thực thi ORDER BY trên bảng lớn (không vừa work_mem), mỗi lần MapReduce shuffle dữ liệu giữa mapper và reducer, mỗi lần Kafka compact log — đều chạy một variant của thuật toán này.

1. Mô hình bài toán và giới hạn I/O

Cho N record cần sort, RAM khả dụng M (tính theo số record). Constraint chính không phải CPU — là số lần đọc/ghi disk (I/O). Đọc/ghi tuần tự (sequential) nhanh hơn random I/O ~100 lần; thuật toán tốt tối thiểu hoá số pass qua toàn bộ dữ liệu.

Định nghĩa: một pass là đọc qua toàn bộ N record + ghi kết quả. Mục tiêu: tối thiểu số pass.

Tham sốÝ nghĩa
Ntổng số record (ví dụ 10 tỷ)
MRAM chứa được tối đa bao nhiêu record (ví dụ 800 triệu)
ksố run merge cùng lúc trong Phase 2
R = ceil(N/M)số run sinh ra sau Phase 1

2. Phase 1 — tạo sorted run

Đọc từng chunk M record vào RAM, sort in-memory (bằng bất kỳ thuật toán O(n log n)), ghi ra file "run" trên disk. Lặp cho đến hết input.

function createRuns(inputFile, M):
    runs <- []                              -- danh sách file run trên disk
    while inputFile chưa hết:
        chunk <- đọc M record từ inputFile  -- đọc vừa đủ RAM
        sort(chunk)                         -- sort in-memory, O(M log M)
        runFile <- ghi chunk ra disk
        runs.append(runFile)
    return runs
// Time: O(N log M)  Space: O(M) RAM
// I/O: 1 pass đọc + 1 pass ghi = 2N I/O

Với 100 GB dữ liệu và 8 GB RAM: sinh ra ceil(100/8) = 13 run, mỗi run 8 GB đã sorted.

3. Phase 2 — k-way merge bằng min-heap

R run đã sorted trên disk. Cần gộp thành 1 file sorted. Naive: merge từng cặp như merge sort — nhưng cần log2(R) lần duyệt lại toàn bộ dữ liệu. k-way merge gộp k run cùng lúc, giảm số pass xuống ceil(log_k(R)).

Cơ chế: dùng min-heap kích thước k. Mỗi slot heap giữ phần tử nhỏ nhất hiện tại của một run, kèm con trỏ biết lấy từ run nào. Mỗi bước: pop phần tử nhỏ nhất từ heap (là phần tử nhỏ nhất toàn bộ), ghi vào output, nạp phần tử kế của run đó vào heap.

function kWayMerge(runs[0..k-1], outputFile):
    H <- MinHeap()
    for i từ 0 đến k-1:                    -- khởi tạo heap với phần tử đầu mỗi run
        if runs[i] không rỗng:
            H.push((runs[i].readNext(), i)) -- (giá trị, chỉ số run)

    while H không rỗng:
        (val, runIdx) <- H.pop()            -- lấy phần tử nhỏ nhất toàn bộ
        outputFile.write(val)
        if runs[runIdx] không rỗng:
            H.push((runs[runIdx].readNext(), runIdx))  -- nạp phần tử kế
// Time: O(N log k)  Space: O(k) trên RAM (heap chỉ giữ k phần tử)
// I/O mỗi pass: 2N (đọc + ghi); số pass = ceil(log_k(R))

Tại sao min-heap đúng? Tại mọi thời điểm, heap chứa đúng k ứng viên — một từ mỗi run. Phần tử đầu heap nhỏ nhất trong k ứng viên, và vì mỗi run đã sorted, phần tử đó nhỏ nhất trong tất cả dữ liệu còn lại. Bằng chứng bằng induction: sau mỗi pop, invariant được duy trì.

3.1 Trace k-way merge với k=3

3 run đã sorted:

Run 0: [1, 5, 9]
Run 1: [2, 4, 8]
Run 2: [3, 6, 7]
BướcHeap (giá trị, run)PopOutputNạp vào
Khởi tạo2
12(1,0)1(5,0)
20(2,1)2(4,1)
30(3,2)3(6,2)
42(4,1)4(8,1)
51(5,0)5(9,0)
...............

Output cuối: 1 2 3 4 5 6 7 8 9 — đúng.

4. Phân tích I/O — vì sao tăng k quan trọng

Tổng I/O (đơn vị: số lần đọc/ghi N record):

Tổng pass = 1 (Phase 1) + ceil(log_k(R))   -- R = ceil(N/M)

Với N=100 GB, M=8 GB → R=13 run:

kceil(log_k(13))Tổng passTổng I/O (×200 GB)
2451000 GB
423600 GB
823600 GB
1312400 GB

Với k=13 (merge tất cả run trong 1 pass), chỉ cần 2 pass tổng — tiết kiệm ~60% I/O so với k=2. Nhưng k lớn đòi hỏi k buffer page RAM cho mỗi run; giới hạn thực tế của k phụ thuộc RAM/page-size.

Trực giác về k tối ưu

Postgres chọn k dựa trên work_mem / merge_buffer_size. Nếu work_mem = 64 MB và buffer mỗi run 4 MB thì k=16. Tăng work_mem → tăng k → ít pass → query nhanh hơn.

flowchart TD
    INPUT["Dữ liệu 100 GB\n(chưa sắp xếp)"]
    C1["Chunk 8 GB → sort → Run 1"]
    C2["Chunk 8 GB → sort → Run 2"]
    C3["... → Run 13"]
    HEAP["Min-Heap (k=13)\ngiữ 13 phần tử đầu"]
    OUT["Output sorted\n100 GB"]

    INPUT --> C1
    INPUT --> C2
    INPUT --> C3
    C1 --> HEAP
    C2 --> HEAP
    C3 --> HEAP
    HEAP --> OUT

5. Ứng dụng thực tế

Postgres tuplesort: khi ORDER BY không vừa work_mem, Postgres dùng external sort với k-way merge. Parameter work_mem điều chỉnh M; EXPLAIN ANALYZE hiện Sort Method: external merge khi bị spill to disk.

MapReduce shuffle: sau Map phase, mỗi mapper sort output của mình (Phase 1); Reducer nhận nhiều sorted partition từ nhiều mapper và k-way merge (Phase 2). Đây chính là lý do MapReduce "shuffle" tốn kém — nó là external sort toàn cluster.

External dedup: thay vì sort để tìm unique, pipeline lớn (Spark distinct(), Flink dedup) sort trước rồi quét liên tiếp loại bản sao — linear scan sau external sort rẻ hơn hash table khi data vượt RAM.

6. Pitfall

Pitfall 1 — Chọn k quá lớn, buffer mỗi run quá nhỏ

-- SAI: k = 1000 run, mỗi buffer chỉ còn 8 KB
-- → random I/O liên tục (mỗi lần readNext đọc 8 KB) → chậm hơn k nhỏ hơn
-- ĐÚNG: k = floor(M / buffer_per_run)
-- Đảm bảo mỗi run có buffer đủ lớn để sequential read
-- Ví dụ: M = 8 GB, buffer_per_run = 4 MB → k = 2048, nhưng thực tế
-- Postgres/DBMS giới hạn k ~500 vì quản lý nhiều file descriptor tốn kém

Pitfall 2 — Bỏ quên double buffering gây I/O stall

-- SAI: đọc buffer → process → đọc buffer → process (tuần tự, CPU chờ I/O)

-- ĐÚNG: double buffer — trong khi CPU xử lý buffer A,
-- I/O đọc sẵn buffer B (prefetch); xong đổi vai
-- Tăng throughput ~2x trên HDD/SSD thông thường

Pitfall 3 — Không xử lý run cuối nhỏ hơn M

-- SAI: giả định mọi run đều M record
-- → nếu N không chia hết M, run cuối nhỏ hơn
-- → heap push sai số lượng / index out of bounds

-- ĐÚNG: kiểm tra "runs[i] không rỗng" trước mỗi readNext
-- (đã có trong pseudocode kWayMerge ở trên)

7. Liên hệ các bài khác

  • Heap & heapsort: min-heap là cốt lõi của k-way merge — hiểu cách push/pop O(log k) là cách hiểu tại sao Phase 2 chạy O(N log k). Đọc bài đó trước nếu chưa quen heap.
  • Merge sort: external sort là generalisation của merge sort lên disk. Phase 1 tương đương base case; Phase 2 tương đương merge pass. Khác biệt: merge sort recursive in-memory; external sort iterative, k-way, I/O-aware.
  • Amortized analysis: phân tích "số pass × 2N I/O mỗi pass" là amortized nhìn từ góc độ I/O thay vì CPU cycle.
  • Count-Min Sketch: cùng context big-data streaming — khi sort quá đắt (data vô tận), dùng probabilistic structure thay thế; cross-reference hiểu tradeoff exact vs approximate.
  • Case study Redis & Kafka: Kafka log compaction dùng external sort để merge segment — đọc case study để thấy thuật toán này trong hệ thống production thực.

📚 Deep Dive

📚 Deep Dive — Tài liệu và triển khai thực tế

Nền lý thuyết:

  • Knuth, The Art of Computer Programming Vol. 3, §5.4 "External Sorting" — phân tích đầy đủ nhất về external sort, replacement selection, polyphase merge.
  • Ramakrishnan & Gehrke, Database Management Systems Ch. 13 — external sort trong context DBMS với cost model I/O page.

Postgres tuplesort:

  • Source: src/backend/utils/sort/tuplesort.cEXPLAIN ANALYZE với Sort Method: external merge Disk: X MB cho thấy khi nào bị spill.
  • Tăng SET work_mem = '256MB' trong session để force in-memory sort cho query cụ thể.

Replacement selection (ngoài bài):

  • Thay vì sort chunk cố định M, dùng heap size M để sinh run trung bình dài 2M (Knuth §5.4.1). Postgres dùng variant này.

MapReduce:

  • Dean & Ghemawat (2004), "MapReduce: Simplified Data Processing on Large Clusters" — Section 4 mô tả shuffle = distributed external sort.

Tóm tắt

  • External sort chia thành Phase 1 (tạo sorted run in-memory) và Phase 2 (k-way merge bằng min-heap).
  • Phase 1 I/O: 2N (1 pass đọc + 1 pass ghi); Phase 2 I/O: 2N × ceil(log_k(R)) pass.
  • Min-heap giữ chỉ k phần tử trong RAM; mỗi pop O(log k); tổng Phase 2 O(N log k).
  • Tăng k giảm số pass I/O nhưng đòi hỏi nhiều buffer RAM hơn — cân bằng thực tế phụ thuộc work_mem và kích thước buffer.
  • Postgres dùng external merge sort khi ORDER BY không vừa work_mem; MapReduce shuffle là external sort phân tán.
  • Double buffering (prefetch) tăng throughput ~2x; replacement selection tăng kích thước run trung bình lên ~2M.

Tự kiểm tra

Tự kiểm tra
Q1
Với N = 1 tỷ record, M = 100 triệu record (RAM), k = 10: Phase 1 sinh bao nhiêu run và Phase 2 cần bao nhiêu pass? Tổng I/O bằng bao nhiêu lần N?

Phase 1: R = ceil(1B / 100M) = 10 run — đọc toàn bộ dữ liệu 1 lần và ghi 10 run, tốn 2N I/O.

Phase 2: ceil(log_10(10)) = 1 pass — merge 10 run cùng lúc với k=10, đọc và ghi thêm 2N I/O.

Tổng: 2 + 2 = 4 lần N. Nếu N = 1 tỷ record × 100 byte = 100 GB thì tổng I/O ~400 GB. Với SSD 500 MB/s tuần tự ~ 13 phút.

Q2
Vì sao min-heap k-way merge đúng — tại mọi bước, phần tử pop ra từ heap thực sự là phần tử nhỏ nhất trong tất cả dữ liệu còn lại chưa ghi output?

Invariant: heap chứa đúng một phần tử từ mỗi run — phần tử nhỏ nhất còn lại của run đó (vì run đã sorted, phần tử đứng đầu run là nhỏ nhất của run).

Mọi phần tử chưa ghi output thuộc về một trong k run. Phần tử nhỏ nhất toàn bộ phải là phần tử đứng đầu của một run nào đó. Heap giữ đúng k đầu run này → min của heap = min toàn bộ.

Khi pop và nạp phần tử kế của run đó, invariant được duy trì. Bằng induction, mọi pop đều lấy đúng phần tử nhỏ nhất.

Q3
Nếu tăng k từ 10 lên 100, số pass giảm, nhưng có thể xuất hiện vấn đề gì với hiệu năng thực tế?

Hai vấn đề chính: (1) Buffer mỗi run nhỏ lại — nếu RAM tổng cố định M, mỗi trong 100 run chỉ được M/100 buffer. Buffer nhỏ nghĩa là readNext() phải đọc disk thường xuyên hơn, mỗi lần đọc ít byte → random I/O nhiều hơn → chậm hơn trên HDD.

(2) File descriptor overhead — mở 100 file đồng thời tiêu tốn OS resource; nhiều hệ thống giới hạn số file descriptor mở đồng thời (ulimit). Postgres giới hạn k thực tế ~500.

Điểm tối ưu thực nghiệm thường là k = floor(M / min_buffer_size) với min_buffer_size đủ lớn cho sequential read hiệu quả (thường 1–4 MB).

Q4
Postgres báo 'Sort Method: external merge Disk: 256 MB' trong EXPLAIN ANALYZE. Điều gì xảy ra bên dưới và bạn có thể làm gì để loại bỏ disk spill?

Postgres đã thực hiện external sort vì ORDER BY không vừa work_mem. Cụ thể: nó chia data thành các run sorted, ghi ra temp file (256 MB trên disk), rồi k-way merge khi đọc ngược lại. Query chậm hơn in-memory sort đáng kể.

Cách khắc phục: tăng work_mem cho session bằng SET work_mem = '512MB' (hoặc lớn hơn tuỳ dataset). Nếu query quan trọng, tăng work_mem trong postgresql.conf cho role cụ thể. Lưu ý: work_mem áp per sort operation per connection — đặt quá cao gây OOM nếu nhiều connection sort cùng lúc.

Q5
Tại sao shuffle phase trong MapReduce tốn kém và được xem là 'bottleneck' của nhiều job?

Shuffle = distributed external sort qua network. Sau Map phase, mỗi mapper có output chưa sorted; Hadoop/Spark phải: (1) sort output của mỗi mapper (Phase 1 local); (2) gửi các partition qua network đến reducer tương ứng; (3) reducer k-way merge các sorted stream nhận được (Phase 2).

Bước (2) là bottleneck: network bandwidth thường chậm hơn disk nhiều lần; toàn bộ intermediate data phải đi qua mạng; nếu một reducer nhận quá nhiều data (data skew), nó trở thành straggler chặn toàn job. Đây là lý do các framework mới (Spark, Flink) cố tránh shuffle bằng broadcast join hay partition-aware join khi có thể.

Q6
Replacement selection là kỹ thuật nào và nó cải thiện Phase 1 ra sao so với sort chunk cố định?

Replacement selection dùng heap size M thay vì sort chunk M cứng nhắc. Cơ chế: đọc M phần tử vào heap; khi ghi phần tử nhỏ nhất ra run hiện tại, đọc phần tử mới từ input — nếu phần tử mới lớn hơn phần tử vừa ghi, nạp vào heap tiếp tục run hiện tại; nếu nhỏ hơn, đánh dấu "chờ run tiếp theo".

Kết quả: với dữ liệu ngẫu nhiên, run trung bình dài 2M thay vì M (Knuth §5.4.1). Số run Phase 1 giảm một nửa → Phase 2 cần ít pass hơn. Postgres dùng variant này; với dữ liệu có độ sorted sẵn cao (nearly sorted), run có thể dài cả file — external sort degenerate về O(N log N) single-pass.

Q7
Nếu cần external dedup (loại bản sao) cho 50 GB log, bạn dùng external sort thế nào? Tại sao không dùng hash set?

Quy trình: external sort toàn bộ 50 GB theo key cần dedup, rồi linear scan kết quả — mỗi khi gặp record trùng với record trước, bỏ qua. Total: O(N log N) sort + O(N) scan.

Hash set O(N) average nhưng đòi O(N) RAM — với 50 GB data, bảng hash cần hàng chục GB RAM (overhead pointer, bucket), vượt khả năng. External hash (partition by hash, sort within partition) là hybrid nhưng phức tạp hơn.

External sort tận dụng sequential I/O hiệu quả và không đòi RAM O(N). Spark distinct() mặc định dùng external sort khi data spill; chỉ chuyển sang hash-based khi data vừa memory.

Bài tiếp theo: Reservoir sampling — mẫu ngẫu nhiên từ stream

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên