Java — Từ Zero đến Senior/Concurrent collections — ConcurrentHashMap, CopyOnWrite, BlockingQueue
~22 phútConcurrency cơ bảnMiễn phí

Concurrent collections — ConcurrentHashMap, CopyOnWrite, BlockingQueue

Vì sao HashMap có thể infinite loop dưới multithread. Lock striping của ConcurrentHashMap, copy-on-write cho read-heavy list, BlockingQueue cho producer-consumer. Back-pressure pattern.

Tóm tắt lịch sử một bug kinh điển: Java 7, 2 thread cùng put vào HashMap → bảng hash phải resize (gấp đôi capacity, rehash entry). Resize không atomic — 2 thread cùng resize song song có thể tạo cycle trong linked list bên trong bucket → get sau đó loop forever, CPU 100%.

Bug này làm nhiều production service crash — tìm ra cực khó vì thread đang loop vô tận, không exception, không log. Chỉ có CPU nóng.

Java 8 redesign HashMap (dùng tree khi collision cao) giảm cơ hội cycle, nhưng vẫn không thread-safe — value vẫn có thể corrupt silent. Bài học: HashMap, ArrayList, LinkedList không được dùng từ nhiều thread.

Giải pháp cũ (Java 1.2): Collections.synchronizedMap(map) — wrap mọi method trong synchronized. An toàn nhưng mọi op lock toàn map — nhiều thread chờ nhau, contention cao, throughput thấp.

Giải pháp modern (Java 5+): java.util.concurrentConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue. Mỗi class thiết kế cho use case cụ thể với optimization khác nhau.

Bài này giải thích 3 concurrent collection phổ biến nhất: cơ chế bên dưới, khi nào dùng cái nào, và các pitfall thường gặp.

1. Vì sao HashMap không safe — chi tiết

Hiểu failure mode để biết vì sao cần ConcurrentHashMap, không chỉ "thêm synchronized".

Cấu trúc HashMap

HashMap lưu entry trong bucket array. Hash của key quyết định bucket. Collision (nhiều key cùng bucket) được xử lý bằng linked list (Java 7) hoặc tree (Java 8+).

Khi size > threshold (default capacity * 0.75), resize: tạo array gấp đôi, rehash mọi entry vào bucket mới.

Race condition trong resize

Java 7, 2 thread cùng trigger resize:

  1. Thread A bắt đầu rehash bucket 3. Đi qua linked list e1 -> e2 -> e3, rehash từng entry, build list mới trong bucket mới.
  2. Thread B bắt đầu rehash cùng bucket 3. Đồng thời thao tác trên cùng linked list.
  3. Thread A và B interleave, kết thúc với linked list có cycle: e1 -> e2 -> e1 (do A set e1.next = e2, B set e2.next = e1).

Sau đó bất kỳ get nào hash vào bucket đó → while (e != null) { if (e.key == k) return e.value; e = e.next; } → loop vô tận.

Java 8 dùng algorithm khác (bucket transfer theo high/low bit) giảm risk cycle, nhưng vẫn có scenario value corrupt.

Kết luận

HashMap không bao giờ dùng từ nhiều thread. Nếu tình cờ OK 1 triệu lần, lần thứ 1,000,001 có thể corrupt. Bug hiếm gặp nhưng catastrophic.

2. Collections.synchronizedMap — giải pháp cũ, không tối ưu

Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
map.put("a", 1);

Bên dưới: wrap HashMap với mỗi method synchronized trên mutex chung:

class SynchronizedMap<K, V> implements Map<K, V> {
    private final Map<K, V> m;
    private final Object mutex;   // this hoac custom

    public V put(K k, V v) { synchronized (mutex) { return m.put(k, v); } }
    public V get(Object k) { synchronized (mutex) { return m.get(k); } }
    // ...
}

An toàn vì mỗi op atomic dưới 1 lock duy nhất.

Không tối ưu: nhiều thread đọc cùng lúc — tất cả chờ nhau dù đọc không conflict logic. Write block hết reader. Contention cao với high concurrency.

Thêm: iterator không lock — nếu iterate trong khi thread khác modify → ConcurrentModificationException (fail-fast) hoặc behavior không xác định.

synchronized (map) {   // Phai lock thu cong khi iterate
    for (var e : map.entrySet()) { ... }
}

Verbose, dễ quên. Đó là lý do ConcurrentHashMap được thiết kế.

3. ConcurrentHashMap — lock striping

Cơ chế

Java 7 design: chia map thành 16 segment, mỗi segment là 1 sub-map với lock riêng. Thread update segment 1 không block thread update segment 2 — parallel thực sự.

Java 8+ redesign: bỏ segment, thay bằng lock per bucket. Mỗi bucket (đầu linked list) có thể lock độc lập. Khi bucket trống, put dùng CAS — không cần lock.

Kết quả: 2 thread update bucket khác nhau → 0 contention. 2 thread update cùng bucket → chỉ lock bucket đó, không toàn map.

API cơ bản

import java.util.concurrent.ConcurrentHashMap;

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("a", 1);
int v = map.get("a");

// Iterator - weakly consistent, khong throw ConcurrentModificationException
for (var e : map.entrySet()) {
    System.out.println(e.getKey() + "=" + e.getValue());
}

Method atomic đặc biệt

ConcurrentHashMap có method atomic cho pattern phổ biến — không cần lock thủ công:

// putIfAbsent: chi put khi key chua co
map.putIfAbsent("key", 42);

// computeIfAbsent: lazy init value
map.computeIfAbsent("key", k -> loadFromDB(k));

// computeIfPresent: update chi khi key co
map.computeIfPresent("key", (k, v) -> v * 2);

// compute: general - k, current value (co the null) -> new value (co the null de remove)
map.compute("counter", (k, v) -> v == null ? 1 : v + 1);

// merge: combine voi value cu (pattern word count)
map.merge("word", 1, Integer::sum);

Tất cả atomic trong scope 1 bucket. Pattern "if not exists then put" trong 1 op — không có khoảng hở cho race condition.

Anti-pattern: check-then-act không safe

// BAD
if (!map.containsKey(key)) {
    map.put(key, value);
}

Hai thread cùng check false → cả hai put. Update thứ hai overwrite thứ nhất. Race condition.

// GOOD
map.putIfAbsent(key, value);

putIfAbsentmột op atomic — kiểm tra + put trong cùng lock bucket. Return value là giá trị hiện tại (null nếu put thành công, value cũ nếu bỏ qua).

Null không cho phép

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("x", null);   // NullPointerException
map.put(null, 1);     // NullPointerException

Lý do: trong HashMap, map.get(k) == null ambiguous — không biết key không tồn tại hay value là null. Phải containsKey check — 2 op không atomic trong concurrent.

ConcurrentHashMap tránh hẳn: null not allowed → get trả null chỉ khi key không có, đơn giản và atomic.

Workaround: dùng sentinel value (Optional.empty(), "NULL"), hoặc wrap value với Optional<V>.

Bulk operation parallel

// forEach parallel
map.forEach(1000,   // threshold: > 1000 entries moi parallel
    (k, v) -> System.out.println(k + "=" + v));

// Search parallel
String found = map.search(1000, (k, v) -> v > 100 ? k : null);

// Reduce parallel
int total = map.reduceValues(1000, Integer::sum);

Parameter đầu là threshold — map nhỏ hơn thì chạy sequential (tránh overhead). Lớn hơn thì chia task cho commonPool.

4. CopyOnWriteArrayList — read-heavy, write-rare

Cơ chế

Mỗi lần add, remove, set:

  1. Lock.
  2. Copy toàn bộ array gốc vào array mới.
  3. Modify array mới.
  4. Thay reference volatile trỏ tới array mới.
  5. Unlock.

Read (get, iterator) không lock — đọc thẳng array hiện tại. Iterator giữ snapshot array lúc tạo — immune với mọi modification sau.

OpChi phí
get(i)O(1), no lock
iterator().next()O(1), no lock, stable snapshot
add(e), remove(i), set(i, e)O(n) copy + lock

Use case

Design này phù hợp khi:

  • Read rất nhiều, write rất ít: listener list, event subscriber, observer registry, config cache.
  • Iterator không cần thấy modification mới: acceptable nếu iterator chạy dài thấy snapshot cũ.

Ví dụ điển hình — listener list:

class EventBus {
    private final List<Listener> listeners = new CopyOnWriteArrayList<>();

    public void subscribe(Listener l) {
        listeners.add(l);   // Hiem, 1 lan lúc boot
    }

    public void publish(Event e) {
        for (Listener l : listeners) {   // Chay thuong xuyen, hang nghin lan
            l.onEvent(e);
        }
    }
}

Publish chạy hàng nghìn lần mỗi giây — zero lock, zero contention. Subscribe chạy vài lần lúc khởi động — copy toàn array tốn O(n) nhưng không quan trọng.

Khi không dùng

  • Write nhiều: mỗi add copy toàn array. List 10k element, add 1k lần = 10M copy. Tệ hại.
  • Cần thấy update mới trong iterator: iterator chạy trên snapshot cũ. Nếu iterate mất 1 phút và trong đó có modification, iterator miss update.

Thay thế cho write-heavy: Collections.synchronizedList(new ArrayList<>()) — lock mỗi op nhưng không copy. Hoặc refactor: giữ data trong ConcurrentHashMap và dùng values() nếu không cần thứ tự.

5. BlockingQueue — producer-consumer

Cơ chế

Queue thread-safe có 2 đặc tính đặc biệt:

  • Block khi full: put block đến khi có chỗ.
  • Block khi empty: take block đến khi có element.

Perfect cho pattern producer-consumer.

Ví dụ cơ bản

BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// Producer
new Thread(() -> {
    try {
        queue.put("item");   // Block neu queue day
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

// Consumer
new Thread(() -> {
    try {
        String item = queue.take();   // Block neu queue rong
        System.out.println(item);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

4 dạng method cho insert/remove

Mỗi op có 4 variant khác nhau về behavior khi queue full/empty:

OpThrow exceptionReturn specialBlockTimeout
Insertadd(e)offer(e) (false)put(e)offer(e, timeout, unit)
Removeremove()poll() (null)take()poll(timeout, unit)
Examineelement()peek() (null)

Chọn theo semantic:

  • Critical insert không được mất → put (block).
  • Best-effort insert, bỏ nếu queue đầy → offer.
  • Polling với timeout → offer(e, 1s) hay poll(1s).

Implementation variants

// Array-backed, bounded
BlockingQueue<T> q1 = new ArrayBlockingQueue<>(100);

// Linked, bounded hoac unbounded
BlockingQueue<T> q2 = new LinkedBlockingQueue<>(1000);   // bounded
BlockingQueue<T> q3 = new LinkedBlockingQueue<>();       // UNBOUNDED - Integer.MAX_VALUE

// Priority queue, unbounded
BlockingQueue<T> q4 = new PriorityBlockingQueue<>();

// Capacity 0 - synchronous handoff
BlockingQueue<T> q5 = new SynchronousQueue<>();

// Element co delay, chi take duoc khi expired
BlockingQueue<T> q6 = new DelayQueue<>();

Chú ý LinkedBlockingQueue default unbounded — producer nhanh hơn consumer → queue grows vô hạn → OOM. Production luôn pass capacity explicit.

Pattern producer-consumer đầy đủ

BlockingQueue<Task> queue = new ArrayBlockingQueue<>(100);
ExecutorService exec = Executors.newFixedThreadPool(8);

// 3 producer
for (int i = 0; i < 3; i++) {
    exec.submit(() -> {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Task t = generateTask();
                queue.put(t);   // Block khi queue day -> back-pressure
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    });
}

// 5 consumer
for (int i = 0; i < 5; i++) {
    exec.submit(() -> {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Task t = queue.take();   // Block khi queue rong
                process(t);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    });
}

Back-pressure là tính chất quan trọng: nếu consumer chậm, queue đầy → producer block tại put → ngầm slow producer down → load balance tự động. Không cần logic rate-limit riêng.

6. Các lớp concurrent khác

Không phải ai cũng dùng, nhưng tốt để biết có:

ClassUse case
ConcurrentSkipListMapSorted map thread-safe. O(log n) op.
ConcurrentSkipListSetSorted set thread-safe.
CopyOnWriteArraySetSet version của CopyOnWriteArrayList.
ConcurrentLinkedQueueUnbounded non-blocking queue (dùng CAS).
LinkedTransferQueueQueue + transfer(e) — producer chờ consumer nhận.
ConcurrentHashMap.newKeySet()Set xây trên ConcurrentHashMap.

7. Khi nào chọn cái nào

flowchart TD
    A[Can concurrent collection?] --> B{Kieu gi?}
    B -->|Map| C{Can sorted?}
    C -->|Khong| D[ConcurrentHashMap]
    C -->|Co| E[ConcurrentSkipListMap]
    B -->|List| F{Read/Write ratio?}
    F -->|Read >>> Write<br/>Write hiem| G[CopyOnWriteArrayList]
    F -->|Write thuong xuyen| H[Collections.synchronizedList<br/>hoac refactor]
    B -->|Queue| I{Block semantics?}
    I -->|Co| J[ArrayBlockingQueue<br/>LinkedBlockingQueue]
    I -->|Khong| K[ConcurrentLinkedQueue]
    I -->|Priority| L[PriorityBlockingQueue]

8. Pattern thực tế — word count concurrent

Đếm số lần xuất hiện của mỗi từ trong data stream từ N thread:

public class WordCounter {
    private final ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();

    public void count(String text) {
        for (String word : text.split("\\s+")) {
            counts.merge(word, 1, Integer::sum);
        }
    }

    public int getCount(String word) {
        return counts.getOrDefault(word, 0);
    }
}

counts.merge(word, 1, Integer::sum) atomic:

  • Nếu word chưa có → put 1.
  • Nếu có → lấy current value + 1, put lại.

N thread gọi count — không corrupt, không lock thủ công. Mỗi word là 1 bucket, 2 thread ghi cùng word → chờ nhau ngắn; 2 thread ghi word khác → parallel.

Alternative cho contention cao cùng key: dùng AtomicInteger làm value:

private final ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<>();

public void count(String word) {
    counts.computeIfAbsent(word, k -> new AtomicInteger()).incrementAndGet();
}

AtomicInteger.incrementAndGet dùng CAS — nhanh hơn merge lock bucket khi nhiều thread ghi cùng key hot.

9. Pitfall tổng hợp

Nhầm 1: Check-then-act trên ConcurrentHashMap.

if (!map.containsKey(k)) map.put(k, v);   // Race

map.putIfAbsent(k, v) hoặc computeIfAbsent.

Nhầm 2: CopyOnWriteArrayList với write thường xuyên.

CopyOnWriteArrayList<Event> events = new CopyOnWriteArrayList<>();
for (int i = 0; i < 1_000_000; i++) events.add(new Event());
// Copy mang 1M lan!

ConcurrentLinkedQueue hoặc synchronizedList cho write-heavy. Hoặc đổi thiết kế.

Nhầm 3: ConcurrentHashMap.put(k, null).

map.put("key", null);   // NullPointerException

✅ Không có null — dùng sentinel value hoặc Optional.

Nhầm 4: LinkedBlockingQueue không capacity → unbounded → OOM.

BlockingQueue<T> q = new LinkedBlockingQueue<>();   // Default Integer.MAX_VALUE

new LinkedBlockingQueue<>(1000) — bounded, có back-pressure.

Nhầm 5: Synchronize iterator của synchronizedMap không lock thủ công.

for (var e : synchronizedMap.entrySet()) { ... }
// ConcurrentModificationException neu thread khac modify

✅ Lock map trong iterate:

synchronized (synchronizedMap) {
    for (var e : synchronizedMap.entrySet()) { ... }
}

Nhầm 6: Iterate ConcurrentHashMap assume consistent snapshot.

for (var e : map.entrySet()) {
    // Weakly consistent - co the thay update xay ra mid-iterate
}

Iterator không throw CME, nhưng có thể thấy (hoặc không thấy) update xảy ra trong lúc iterate. OK cho aggregate không strict; không OK cho invariant phải precise.

10. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Javadoc ConcurrentHashMap mô tả memory consistency: "Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread". Đây là guarantee ngầm cho pattern producer-consumer với BlockingQueue — producer ghi data vào object, put vào queue; consumer take, đọc object — không cần thêm sync thủ công.

11. Tóm tắt

  • HashMap không thread-safe — Java 7 có bug cycle trong resize, Java 8+ giảm risk nhưng vẫn corrupt value.
  • Collections.synchronizedMap — an toàn nhưng lock toàn map → contention cao, throughput thấp.
  • ConcurrentHashMap — lock per bucket (Java 8+), atomic method (putIfAbsent, compute, merge). Không cho null key/value.
  • Atomic method trong ConcurrentHashMap thay thế check-then-act pattern — 1 op tổng atomic.
  • CopyOnWriteArrayList — write copy toàn array, read no-lock. Dùng cho read-heavy, write hiếm (listener, observer).
  • BlockingQueue — producer-consumer pattern với block/timeout semantic. 4 variant cho mỗi op (throw, special value, block, timeout).
  • LinkedBlockingQueue không capacity → unbounded → OOM risk. Production luôn pass capacity.
  • Bounded queue = back-pressure: producer chậm lại khi consumer chậm, load balance tự động.
  • Chọn collection theo use case: map → ConcurrentHashMap; list read-heavy → CopyOnWriteArrayList; producer-consumer → BlockingQueue.

12. Tự kiểm tra

Tự kiểm tra
Q1
Vì sao ConcurrentHashMap không cho null value?

Trong HashMap, map.get(k) == null ambiguous: không biết key không tồn tại hay value là null. Phải containsKey check — 2 op.

Trong concurrent context, containsKey + get không atomic với nhau. Thread khác có thể xoá key giữa 2 op → check ra true, get ra null → ngộ nhận "key tồn tại với value null".

ConcurrentHashMap tránh hẳn: null not allowed → get trả null chỉ khi key không có. Đơn giản, unambiguous, atomic.

Workaround nếu thực sự cần "không có value": dùng sentinel (Optional.empty(), constant "NULL"), hoặc wrap Optional<V>.

Q2
Khi nào CopyOnWriteArrayList phù hợp?

Khi write hiếm, read nhiều, và iterator không cần thấy update mới. Typical use case:

  • Listener / observer list: subscribe vài lần lúc boot, fire event hàng nghìn lần mỗi giây. Copy O(n) mỗi subscribe không quan trọng vì rare; read zero-lock quan trọng vì hot.
  • Config cache: reload 1 giờ 1 lần, read nhiều. Reader thấy snapshot ổn định trong 1 giờ, không cần thấy update ngay.
  • Plugin registry: đăng ký lúc boot, iterate mọi request.

Không dùng khi write thường xuyên (log buffer, hit counter) — mỗi add copy toàn array → tệ hại với size lớn. Write 1000 lần vào list 10k element = 10M copy.

Dấu hiệu wrong tool: code có CopyOnWriteArrayList và performance profiler cho thấy hot path là Arrays.copyOf — chuyển sang `Collections.synchronizedList` hoặc refactor design.

Q3
Khác biệt giữa putputIfAbsent?
  • put(k, v): luôn ghi đè. Return value cũ (hoặc null nếu chưa có).
  • putIfAbsent(k, v): chỉ ghi nếu key chưa có. Return value hiện tạinull nếu insert thành công, non-null nếu key đã có (value cũ giữ nguyên).

Return value của putIfAbsent là điểm dễ nhầm: null = "đã insert mới", non-null = "skip, giữ value cũ".

Integer existing = map.putIfAbsent("k", 1);
if (existing == null) {
  // insert moi thanh cong
} else {
  // key da co, value hien tai = existing
}

Dùng trong pattern "create if not exist" atomic:

User user = map.computeIfAbsent(userId, id -> loadUserFromDb(id));
// Neu key ton tai -> return; khong -> goi function va cache ket qua
// Atomic - dam bao function chi goi 1 lan cho moi key
Q4
Đoạn sau có race condition không? if (!chm.containsKey(k)) chm.put(k, v);

Có race condition, mặc dù ConcurrentHashMap thread-safe.

Thread A check containsKey → false. Thread A chuẩn bị put (chưa put).

Thread B check containsKey → false. Thread B put v2.

Thread A put v1 → overwrite v2.

Mỗi op trong ConcurrentHashMap atomic, nhưng compose 2 op vẫn không atomic — giữa 2 op có khoảng hở.

Fix: dùng atomic method compose sẵn:

// Insert chi neu chua co, atomic
chm.putIfAbsent(k, v);

// Hoac voi value tinh dong
chm.computeIfAbsent(k, key -> compute(key));

Quy tắc: nếu logic cần "check X rồi làm Y" mà 2 bước phải atomic, tìm atomic method trong ConcurrentHashMap API (putIfAbsent, computeIfAbsent, computeIfPresent, compute, merge, replace). Đừng compose thủ công.

Q5
Vì sao new LinkedBlockingQueue<>() không capacity có thể gây OOM?

Constructor không arg của LinkedBlockingQueue dùng Integer.MAX_VALUE làm capacity — effectively unbounded.

Nếu producer nhanh hơn consumer (network burst, spike traffic), queue grows vô hạn trong memory. Mỗi element = object + linked list node. Với 10 triệu element, heap có thể tăng vài GB → OOM, JVM crash.

Đây là bug production kinh điển: app chạy OK nhiều ngày, đột nhiên OOM khi traffic spike. Root cause: queue unbounded, log aggregator không kịp process.

Production fix:

new LinkedBlockingQueue<>(1000);   // Bounded capacity
// hoac
new ArrayBlockingQueue<>(1000);    // Array-backed, fixed capacity

Queue đầy → producer block tại put → ngầm rate-limit producer → load balance tự nhiên. Pattern bounded queue = back-pressure — core concept của reactive stream.

Q6
Khi nào dùng merge và khi nào dùng computeIfAbsent + AtomicInteger cho counter theo key?
  • merge(key, 1, Integer::sum): lock bucket, đọc, cộng, ghi lại. Mỗi update lock bucket ngắn.
  • computeIfAbsent + AtomicInteger: get-or-create `AtomicInteger`, sau đó `incrementAndGet` dùng CAS — không lock bucket.

So sánh performance:

  • Nhiều key, mỗi key update vài lần → merge đủ. Code ngắn, dễ đọc.
  • Ít key hot (1-10 key) với contention cực cao (100+ thread cùng update 1 key) → AtomicInteger nhanh hơn. CAS không lock OS, ít block hơn lock bucket.

Với contention extreme, dùng LongAdder:

ConcurrentHashMap<String, LongAdder> counts = new ConcurrentHashMap<>();

public void count(String word) {
  counts.computeIfAbsent(word, k -> new LongAdder()).increment();
}

LongAdder chia counter thành nhiều cell, mỗi thread update cell riêng. Sum cuối. Throughput cực cao dưới contention cao.

Bài tiếp theo: Virtual threads — Java 21 lightweight concurrency

Bài này có giúp bạn hiểu bản chất không?