Mini-challenge — top-K phần tử trên stream
Lab: tìm K phần tử xuất hiện nhiều nhất trên stream lớn bằng Count-Min Sketch + min-heap, so với đếm chính xác.
Hệ thống streaming của một nền tảng video cần báo cáo top-10 video được xem nhiều nhất trong giờ vừa qua. Stream đầu vào là hàng chục triệu sự kiện (videoId, timestamp) mỗi giờ — không vừa RAM nếu đếm chính xác toàn bộ. Bài toán xuất hiện ở nhiều dạng thực tế:
- Network monitoring — top-K IP gửi traffic nhiều nhất trong 1 phút (phát hiện DDoS).
- Search engine — top-K query được gõ nhiều nhất trong ngày (autocomplete ranking).
- E-commerce — top-K sản phẩm được click nhiều nhất trong giờ cao điểm (homepage widget).
Dành 20–25 phút tự implement trước khi xem gợi ý.
🎯 Đề bài
Viết ba hàm/cấu trúc:
Cấu trúc CountMinSketch(d, w)
- Input:
dhàng (số hàm hash),wcột (độ rộng mỗi hàng). - Bất biến: với mọi phần tử
x,estimate(x)luôn lớn hơn hoặc bằng tần suất thực củax.
Hàm update(cms, x)
- Tăng counter của
xtrong tất cảdhàng của CMS.
Hàm topK(stream, K, d, w)
- Input: stream các phần tử (đọc tuần tự một lần), số nguyên
K, kích thước CMS(d, w). - Output: danh sách
Kphần tử có tần suất ước lượng cao nhất, theo thứ tự giảm dần. - Ràng buộc bộ nhớ: O(d×w + K) — không được lưu toàn bộ stream vào mảng.
Ví dụ: stream = [A, B, A, C, A, B, D, A, B], K = 2 → output [(A, 4), (B, 3)].
🔍 I-P-O
| Chi tiết | |
|---|---|
| Input | Stream phần tử (bất kỳ loại), K, kích thước CMS (d, w) |
| Process | Với mỗi phần tử: cập nhật CMS; duy trì min-heap K phần tử có ước lượng lớn nhất |
| Output | Danh sách K phần tử và tần suất ước lượng, thứ tự giảm dần |
📦 Concept mapping
| Bước thực hiện | Learning outcome |
|---|---|
Implement update(cms, x) với d hàm hash | Implement Count-Min Sketch — cấu trúc d×w matrix |
Implement estimate(cms, x) trả về min | Compare CMS vs đếm chính xác — CMS luôn over-estimate |
| Dùng min-heap K để lọc K lớn nhất | Choose sketch kết hợp heap cho bài top-K tần suất |
| Phân tích bộ nhớ O(d×w + K) vs O(n) | Explain vì sao cần thuật toán streaming khi RAM không đủ |
📋 Test cases
| Stream | K | d | w | Expected (xấp xỉ) | Ghi chú |
|---|---|---|---|---|---|
[A,B,A,C,A,B,D,A,B] | 2 | 3 | 100 | [(A,4),(B,3)] | Cơ bản, w lớn → ít collision |
[A,B,A,C,A,B,D,A,B] | 2 | 3 | 5 | [(A,≥4),(B,≥3)] | w nhỏ → over-estimate có thể xảy ra |
| 1 triệu phần tử, K=10 | 10 | 5 | 10000 | Top-10 đúng phần tử, tần suất over-estimate | Stress test streaming |
| Tất cả phần tử khác nhau | K | 3 | 100 | K phần tử bất kỳ (mỗi cái 1 lần) | Không có heavy hitter thật sự |
| Stream một phần tử lặp lại n lần | 1 | 3 | 100 | [(X, n)] | Phần tử duy nhất, ước lượng chính xác |
Lưu ý: với CMS, output là xấp xỉ — estimate(x) >= freq(x) luôn đúng nhưng có thể lớn hơn. Test case chính xác khi w đủ lớn (ít collision).
▶️ Starter pseudocode
structure CountMinSketch:
d <- số hàng (số hàm hash)
w <- số cột (độ rộng)
table[d][w] <- 0 -- tất cả counter = 0
-- TODO: chọn d hàm hash độc lập h_1, ..., h_d
function update(cms, x):
-- TODO: với mỗi hàng i từ 0 đến d-1:
-- tăng cms.table[i][h_i(x) mod w]
function estimate(cms, x):
-- TODO: trả về min của cms.table[i][h_i(x) mod w] với mọi i
function topK(stream, K, d, w):
cms <- CountMinSketch(d, w)
heap <- MinHeap kích thước K -- (tần_suất, phần_tử)
seen <- Set -- các phần tử đã thêm vào heap
-- TODO: với mỗi phần tử x trong stream:
-- update(cms, x)
-- freq <- estimate(cms, x)
-- cập nhật heap nếu x nên ở trong top-K
return heap theo thứ tự giảm dần
💡 Gợi ý
Count-Min Sketch là ma trận d × w với d hàm hash độc lập. Mỗi hàm hash h_i ánh xạ phần tử x sang một cột trong [0, w-1].
Update: với mỗi hàng i, tăng table[i][h_i(x) mod w] lên 1.
Estimate: lấy min của table[i][h_i(x) mod w] với tất cả i. Min luôn lớn hơn hoặc bằng tần suất thực (vì counter chỉ tăng, không giảm — collision chỉ làm tăng thêm, không bao giờ giảm).
Để có d hàm hash độc lập trong pseudocode, dùng dạng h_i(x) = (a_i * hash(x) + b_i) mod p mod w với p là số nguyên tố lớn và a_i, b_i khác nhau cho mỗi hàng.
Kiểm tra: sau update(cms, "A") ba lần với d=2, w=10, estimate(cms, "A") phải trả về 3.
Dùng min-heap kích thước K để duy trì K phần tử có tần suất ước lượng cao nhất. Tại mỗi bước đọc phần tử x:
- Gọi
update(cms, x)để cập nhật counter. - Lấy
freq <- estimate(cms, x). - Nếu
xđã có trong heap: cập nhật tần suất củaxtrong heap (xóa rồi thêm lại). - Nếu
xchưa trong heap:- Nếu heap chưa đủ K: thêm
(freq, x)vào heap. - Nếu heap đủ K và
freq > heap.min: xóa phần tử nhỏ nhất, thêm(freq, x).
- Nếu heap chưa đủ K: thêm
Min-heap cho phép kiểm tra "phần tử nhỏ nhất trong top-K" trong O(1) và thay thế trong O(log K) — hiệu quả hơn nhiều so với sort lại toàn bộ.
Cạm bẫy: nếu không cập nhật tần suất của phần tử đã có trong heap, heap sẽ giữ tần suất cũ — có thể bỏ sót heavy hitter khi tần suất tăng dần.
Error bound của Count-Min Sketch (khớp công thức ở bài 04): với xác suất ít nhất 1 - e^(-d), sai số ước lượng của mọi phần tử không vượt quá epsilon × N, trong đó:
N= tổng số phần tử đã thêm vào CMS.epsilon = e / w(vớie ≈ 2.718), tức chọnw = ⌈e/epsilon⌉.dhàng cho xác suất thất bạie^(-d).
Ví dụ: w = 2719, d = 5 → sai số tối đa N/1000, xác suất thất bại e^(-5) ≈ 0.67%. Với N = 1,000,000, sai số tối đa là 1,000 — chấp nhận được cho top-K nếu K nằm xa top.
✅ Lời giải
Count-Min Sketch
structure CountMinSketch(d, w):
table[d][w] <- 0
-- d cặp hệ số (a_i, b_i) cho hàm hash pairwise independent
for i from 0 to d-1:
a[i] <- random odd number
b[i] <- random number
function hashRow(cms, i, x):
-- hash pairwise independent: (a * hash(x) + b) mod p mod w
-- p là số nguyên tố lớn hơn kích thước universe
return (cms.a[i] * hash(x) + cms.b[i]) mod PRIME mod cms.w
function update(cms, x):
for i from 0 to cms.d - 1:
col <- hashRow(cms, i, x)
cms.table[i][col] <- cms.table[i][col] + 1
// Time: O(d) Space: O(d × w)
function estimate(cms, x):
minVal <- +infinity
for i from 0 to cms.d - 1:
col <- hashRow(cms, i, x)
minVal <- min(minVal, cms.table[i][col])
return minVal
// Time: O(d)
Top-K với min-heap
function topK(stream, K, d, w):
cms <- CountMinSketch(d, w)
heap <- MinHeap() -- heap (freq, element), so sánh theo freq
inHeap <- Map() -- element -> freq đang trong heap
for each element x in stream:
update(cms, x)
freq <- estimate(cms, x)
if x in inHeap:
-- cập nhật: xóa entry cũ, thêm lại với freq mới
heap.remove((inHeap[x], x))
heap.push((freq, x))
inHeap[x] <- freq
else if heap.size < K:
heap.push((freq, x))
inHeap[x] <- freq
else if freq > heap.min():
-- loại phần tử nhỏ nhất, thêm x vào
(minFreq, minElem) <- heap.pop()
delete inHeap[minElem]
heap.push((freq, x))
inHeap[x] <- freq
-- trả về K phần tử theo thứ tự giảm dần
result <- []
while heap không rỗng:
result.prepend(heap.pop())
return result
// Time: O(n × d) + chi phí cập nhật heap Space: O(d × w + K)
Lưu ý độ phức tạp: thao tác heap.remove(phần tử bất kỳ) trên binary heap thường là O(K) (phải tìm phần tử). Để mỗi cập nhật đạt O(log K) cần indexed-heap (giữ map vị trí phần tử trong heap). Với indexed-heap, tổng là O(n × (d + log K)).
Tổng bộ nhớ: d × w cho CMS + K cho heap + K cho map inHeap. Tất cả hằng số — không tăng theo n.
Trace stream [A,B,A,C,A,B,D,A,B], K=2, d=3, w=100 (không collision):
| Bước | x | CMS estimate | Heap (min-heap) | inHeap |
|---|---|---|---|---|
| 1 | A | 1 | [(1,A)] | {A:1} |
| 2 | B | 1 | [(1,A),(1,B)] | {A:1,B:1} |
| 3 | A | 2 | [(1,B),(2,A)] | {A:2,B:1} |
| 4 | C | 1 | heap đủ K=2, freq(C)=1 không vượt min=1, bỏ qua | {A:2,B:1} |
| 5 | A | 3 | [(1,B),(3,A)] | {A:3,B:1} |
| 6 | B | 2 | [(2,B),(3,A)] | {A:3,B:2} |
| 7 | D | 1 | freq(D)=1 không vượt min=2, bỏ qua | {A:3,B:2} |
| 8 | A | 4 | [(2,B),(4,A)] | {A:4,B:2} |
| 9 | B | 3 | [(3,B),(4,A)] | {A:4,B:3} |
Kết quả: [(A,4),(B,3)].
🎓 Mở rộng
So sánh với đếm chính xác — bộ nhớ:
| Phương pháp | Bộ nhớ | Sai số | Khi nào dùng |
|---|---|---|---|
| Hash map đếm chính xác | O(n unique) — có thể GB | 0% | RAM đủ, cần chính xác tuyệt đối |
| Count-Min Sketch + heap | O(d×w + K) — vài trăm KB | Over-estimate tối đa ε×N | RAM không đủ, chấp nhận xấp xỉ |
| Space-Saving (Metwally 2005) | O(K) | Tương đương CMS | Tối ưu bộ nhớ cho top-K |
Lossy counting (Manku & Motwani 2002): thuật toán khác cho heavy hitters với bảo đảm lý thuyết khác — thay vì over-estimate, nó có thể bỏ sót phần tử dưới ngưỡng tần suất thấp. Phù hợp khi muốn giới hạn false positive thay vì false negative.
Streaming top-K trong thực tế: Apache Flink dùng CMS + heap cho topN operator trên bounded stream. Twitter Algebird cung cấp TopK monoid có thể merge từ nhiều partition song song — phù hợp với kiến trúc phân tán.
✨ Điều bạn vừa làm được
- Implement Count-Min Sketch
d × wvớidhàm hash độc lập và bất biến over-estimate. - Kết hợp CMS với min-heap K phần tử để giải bài top-K heavy hitters trong O(n × d × log K) và O(d×w + K) bộ nhớ — hằng số theo n.
- Hiểu trade-off: chính xác tuyệt đối cần O(n unique) bộ nhớ, xấp xỉ với sai số có giới hạn chỉ cần O(d×w).
- Nhận ra khi nào CMS không phù hợp: khi K lớn so với tổng phần tử, hoặc khi cần biết chính xác tần suất mỗi phần tử.
Bài tiếp theo: Case study — Redis HLL & Kafka offset
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên