Concurrent collections — ConcurrentHashMap, CopyOnWrite, BlockingQueue
Vì sao HashMap có thể infinite loop dưới multithread. Lock striping của ConcurrentHashMap, copy-on-write cho read-heavy list, BlockingQueue cho producer-consumer. Back-pressure pattern.
Tóm tắt lịch sử một bug kinh điển: Java 7, 2 thread cùng put vào HashMap → bảng hash phải resize (gấp đôi capacity, rehash entry). Resize không atomic — 2 thread cùng resize song song có thể tạo cycle trong linked list bên trong bucket → get sau đó loop forever, CPU 100%.
Bug này làm nhiều production service crash — tìm ra cực khó vì thread đang loop vô tận, không exception, không log. Chỉ có CPU nóng.
Java 8 redesign HashMap (dùng tree khi collision cao) giảm cơ hội cycle, nhưng vẫn không thread-safe — value vẫn có thể corrupt silent. Bài học: HashMap, ArrayList, LinkedList không được dùng từ nhiều thread.
Giải pháp cũ (Java 1.2): Collections.synchronizedMap(map) — wrap mọi method trong synchronized. An toàn nhưng mọi op lock toàn map — nhiều thread chờ nhau, contention cao, throughput thấp.
Giải pháp modern (Java 5+): java.util.concurrent — ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue. Mỗi class thiết kế cho use case cụ thể với optimization khác nhau.
Bài này giải thích 3 concurrent collection phổ biến nhất: cơ chế bên dưới, khi nào dùng cái nào, và các pitfall thường gặp.
1. Vì sao HashMap không safe — chi tiết
Hiểu failure mode để biết vì sao cần ConcurrentHashMap, không chỉ "thêm synchronized".
Cấu trúc HashMap
HashMap lưu entry trong bucket array. Hash của key quyết định bucket. Collision (nhiều key cùng bucket) được xử lý bằng linked list (Java 7) hoặc tree (Java 8+).
Khi size > threshold (default capacity * 0.75), resize: tạo array gấp đôi, rehash mọi entry vào bucket mới.
Race condition trong resize
Java 7, 2 thread cùng trigger resize:
- Thread A bắt đầu rehash bucket 3. Đi qua linked list
e1 -> e2 -> e3, rehash từng entry, build list mới trong bucket mới. - Thread B bắt đầu rehash cùng bucket 3. Đồng thời thao tác trên cùng linked list.
- Thread A và B interleave, kết thúc với linked list có cycle:
e1 -> e2 -> e1(do A sete1.next = e2, B sete2.next = e1).
Sau đó bất kỳ get nào hash vào bucket đó → while (e != null) { if (e.key == k) return e.value; e = e.next; } → loop vô tận.
Java 8 dùng algorithm khác (bucket transfer theo high/low bit) giảm risk cycle, nhưng vẫn có scenario value corrupt.
Kết luận
HashMap không bao giờ dùng từ nhiều thread. Nếu tình cờ OK 1 triệu lần, lần thứ 1,000,001 có thể corrupt. Bug hiếm gặp nhưng catastrophic.
2. Collections.synchronizedMap — giải pháp cũ, không tối ưu
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
map.put("a", 1);
Bên dưới: wrap HashMap với mỗi method synchronized trên mutex chung:
class SynchronizedMap<K, V> implements Map<K, V> {
private final Map<K, V> m;
private final Object mutex; // this hoac custom
public V put(K k, V v) { synchronized (mutex) { return m.put(k, v); } }
public V get(Object k) { synchronized (mutex) { return m.get(k); } }
// ...
}
An toàn vì mỗi op atomic dưới 1 lock duy nhất.
Không tối ưu: nhiều thread đọc cùng lúc — tất cả chờ nhau dù đọc không conflict logic. Write block hết reader. Contention cao với high concurrency.
Thêm: iterator không lock — nếu iterate trong khi thread khác modify → ConcurrentModificationException (fail-fast) hoặc behavior không xác định.
synchronized (map) { // Phai lock thu cong khi iterate
for (var e : map.entrySet()) { ... }
}
Verbose, dễ quên. Đó là lý do ConcurrentHashMap được thiết kế.
3. ConcurrentHashMap — lock striping
Cơ chế
Java 7 design: chia map thành 16 segment, mỗi segment là 1 sub-map với lock riêng. Thread update segment 1 không block thread update segment 2 — parallel thực sự.
Java 8+ redesign: bỏ segment, thay bằng lock per bucket. Mỗi bucket (đầu linked list) có thể lock độc lập. Khi bucket trống, put dùng CAS — không cần lock.
Kết quả: 2 thread update bucket khác nhau → 0 contention. 2 thread update cùng bucket → chỉ lock bucket đó, không toàn map.
API cơ bản
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("a", 1);
int v = map.get("a");
// Iterator - weakly consistent, khong throw ConcurrentModificationException
for (var e : map.entrySet()) {
System.out.println(e.getKey() + "=" + e.getValue());
}
Method atomic đặc biệt
ConcurrentHashMap có method atomic cho pattern phổ biến — không cần lock thủ công:
// putIfAbsent: chi put khi key chua co
map.putIfAbsent("key", 42);
// computeIfAbsent: lazy init value
map.computeIfAbsent("key", k -> loadFromDB(k));
// computeIfPresent: update chi khi key co
map.computeIfPresent("key", (k, v) -> v * 2);
// compute: general - k, current value (co the null) -> new value (co the null de remove)
map.compute("counter", (k, v) -> v == null ? 1 : v + 1);
// merge: combine voi value cu (pattern word count)
map.merge("word", 1, Integer::sum);
Tất cả atomic trong scope 1 bucket. Pattern "if not exists then put" trong 1 op — không có khoảng hở cho race condition.
Anti-pattern: check-then-act không safe
// BAD
if (!map.containsKey(key)) {
map.put(key, value);
}
Hai thread cùng check false → cả hai put. Update thứ hai overwrite thứ nhất. Race condition.
// GOOD
map.putIfAbsent(key, value);
putIfAbsent là một op atomic — kiểm tra + put trong cùng lock bucket. Return value là giá trị hiện tại (null nếu put thành công, value cũ nếu bỏ qua).
Null không cho phép
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("x", null); // NullPointerException
map.put(null, 1); // NullPointerException
Lý do: trong HashMap, map.get(k) == null ambiguous — không biết key không tồn tại hay value là null. Phải containsKey check — 2 op không atomic trong concurrent.
ConcurrentHashMap tránh hẳn: null not allowed → get trả null chỉ khi key không có, đơn giản và atomic.
Workaround: dùng sentinel value (Optional.empty(), "NULL"), hoặc wrap value với Optional<V>.
Bulk operation parallel
// forEach parallel
map.forEach(1000, // threshold: > 1000 entries moi parallel
(k, v) -> System.out.println(k + "=" + v));
// Search parallel
String found = map.search(1000, (k, v) -> v > 100 ? k : null);
// Reduce parallel
int total = map.reduceValues(1000, Integer::sum);
Parameter đầu là threshold — map nhỏ hơn thì chạy sequential (tránh overhead). Lớn hơn thì chia task cho commonPool.
4. CopyOnWriteArrayList — read-heavy, write-rare
Cơ chế
Mỗi lần add, remove, set:
- Lock.
- Copy toàn bộ array gốc vào array mới.
- Modify array mới.
- Thay reference volatile trỏ tới array mới.
- Unlock.
Read (get, iterator) không lock — đọc thẳng array hiện tại. Iterator giữ snapshot array lúc tạo — immune với mọi modification sau.
| Op | Chi phí |
|---|---|
get(i) | O(1), no lock |
iterator().next() | O(1), no lock, stable snapshot |
add(e), remove(i), set(i, e) | O(n) copy + lock |
Use case
Design này phù hợp khi:
- Read rất nhiều, write rất ít: listener list, event subscriber, observer registry, config cache.
- Iterator không cần thấy modification mới: acceptable nếu iterator chạy dài thấy snapshot cũ.
Ví dụ điển hình — listener list:
class EventBus {
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
public void subscribe(Listener l) {
listeners.add(l); // Hiem, 1 lan lúc boot
}
public void publish(Event e) {
for (Listener l : listeners) { // Chay thuong xuyen, hang nghin lan
l.onEvent(e);
}
}
}
Publish chạy hàng nghìn lần mỗi giây — zero lock, zero contention. Subscribe chạy vài lần lúc khởi động — copy toàn array tốn O(n) nhưng không quan trọng.
Khi không dùng
- Write nhiều: mỗi add copy toàn array. List 10k element, add 1k lần = 10M copy. Tệ hại.
- Cần thấy update mới trong iterator: iterator chạy trên snapshot cũ. Nếu iterate mất 1 phút và trong đó có modification, iterator miss update.
Thay thế cho write-heavy: Collections.synchronizedList(new ArrayList<>()) — lock mỗi op nhưng không copy. Hoặc refactor: giữ data trong ConcurrentHashMap và dùng values() nếu không cần thứ tự.
5. BlockingQueue — producer-consumer
Cơ chế
Queue thread-safe có 2 đặc tính đặc biệt:
- Block khi full:
putblock đến khi có chỗ. - Block khi empty:
takeblock đến khi có element.
Perfect cho pattern producer-consumer.
Ví dụ cơ bản
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// Producer
new Thread(() -> {
try {
queue.put("item"); // Block neu queue day
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer
new Thread(() -> {
try {
String item = queue.take(); // Block neu queue rong
System.out.println(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
4 dạng method cho insert/remove
Mỗi op có 4 variant khác nhau về behavior khi queue full/empty:
| Op | Throw exception | Return special | Block | Timeout |
|---|---|---|---|---|
| Insert | add(e) | offer(e) (false) | put(e) | offer(e, timeout, unit) |
| Remove | remove() | poll() (null) | take() | poll(timeout, unit) |
| Examine | element() | peek() (null) | – | – |
Chọn theo semantic:
- Critical insert không được mất →
put(block). - Best-effort insert, bỏ nếu queue đầy →
offer. - Polling với timeout →
offer(e, 1s)haypoll(1s).
Implementation variants
// Array-backed, bounded
BlockingQueue<T> q1 = new ArrayBlockingQueue<>(100);
// Linked, bounded hoac unbounded
BlockingQueue<T> q2 = new LinkedBlockingQueue<>(1000); // bounded
BlockingQueue<T> q3 = new LinkedBlockingQueue<>(); // UNBOUNDED - Integer.MAX_VALUE
// Priority queue, unbounded
BlockingQueue<T> q4 = new PriorityBlockingQueue<>();
// Capacity 0 - synchronous handoff
BlockingQueue<T> q5 = new SynchronousQueue<>();
// Element co delay, chi take duoc khi expired
BlockingQueue<T> q6 = new DelayQueue<>();
Chú ý LinkedBlockingQueue default unbounded — producer nhanh hơn consumer → queue grows vô hạn → OOM. Production luôn pass capacity explicit.
Pattern producer-consumer đầy đủ
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(100);
ExecutorService exec = Executors.newFixedThreadPool(8);
// 3 producer
for (int i = 0; i < 3; i++) {
exec.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Task t = generateTask();
queue.put(t); // Block khi queue day -> back-pressure
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
// 5 consumer
for (int i = 0; i < 5; i++) {
exec.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Task t = queue.take(); // Block khi queue rong
process(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
Back-pressure là tính chất quan trọng: nếu consumer chậm, queue đầy → producer block tại put → ngầm slow producer down → load balance tự động. Không cần logic rate-limit riêng.
6. Các lớp concurrent khác
Không phải ai cũng dùng, nhưng tốt để biết có:
| Class | Use case |
|---|---|
ConcurrentSkipListMap | Sorted map thread-safe. O(log n) op. |
ConcurrentSkipListSet | Sorted set thread-safe. |
CopyOnWriteArraySet | Set version của CopyOnWriteArrayList. |
ConcurrentLinkedQueue | Unbounded non-blocking queue (dùng CAS). |
LinkedTransferQueue | Queue + transfer(e) — producer chờ consumer nhận. |
ConcurrentHashMap.newKeySet() | Set xây trên ConcurrentHashMap. |
7. Khi nào chọn cái nào
flowchart TD
A[Can concurrent collection?] --> B{Kieu gi?}
B -->|Map| C{Can sorted?}
C -->|Khong| D[ConcurrentHashMap]
C -->|Co| E[ConcurrentSkipListMap]
B -->|List| F{Read/Write ratio?}
F -->|Read >>> Write<br/>Write hiem| G[CopyOnWriteArrayList]
F -->|Write thuong xuyen| H[Collections.synchronizedList<br/>hoac refactor]
B -->|Queue| I{Block semantics?}
I -->|Co| J[ArrayBlockingQueue<br/>LinkedBlockingQueue]
I -->|Khong| K[ConcurrentLinkedQueue]
I -->|Priority| L[PriorityBlockingQueue]8. Pattern thực tế — word count concurrent
Đếm số lần xuất hiện của mỗi từ trong data stream từ N thread:
public class WordCounter {
private final ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();
public void count(String text) {
for (String word : text.split("\\s+")) {
counts.merge(word, 1, Integer::sum);
}
}
public int getCount(String word) {
return counts.getOrDefault(word, 0);
}
}
counts.merge(word, 1, Integer::sum) atomic:
- Nếu
wordchưa có → put1. - Nếu có → lấy current value + 1, put lại.
N thread gọi count — không corrupt, không lock thủ công. Mỗi word là 1 bucket, 2 thread ghi cùng word → chờ nhau ngắn; 2 thread ghi word khác → parallel.
Alternative cho contention cao cùng key: dùng AtomicInteger làm value:
private final ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<>();
public void count(String word) {
counts.computeIfAbsent(word, k -> new AtomicInteger()).incrementAndGet();
}
AtomicInteger.incrementAndGet dùng CAS — nhanh hơn merge lock bucket khi nhiều thread ghi cùng key hot.
9. Pitfall tổng hợp
❌ Nhầm 1: Check-then-act trên ConcurrentHashMap.
if (!map.containsKey(k)) map.put(k, v); // Race
✅ map.putIfAbsent(k, v) hoặc computeIfAbsent.
❌ Nhầm 2: CopyOnWriteArrayList với write thường xuyên.
CopyOnWriteArrayList<Event> events = new CopyOnWriteArrayList<>();
for (int i = 0; i < 1_000_000; i++) events.add(new Event());
// Copy mang 1M lan!
✅ ConcurrentLinkedQueue hoặc synchronizedList cho write-heavy. Hoặc đổi thiết kế.
❌ Nhầm 3: ConcurrentHashMap.put(k, null).
map.put("key", null); // NullPointerException
✅ Không có null — dùng sentinel value hoặc Optional.
❌ Nhầm 4: LinkedBlockingQueue không capacity → unbounded → OOM.
BlockingQueue<T> q = new LinkedBlockingQueue<>(); // Default Integer.MAX_VALUE
✅ new LinkedBlockingQueue<>(1000) — bounded, có back-pressure.
❌ Nhầm 5: Synchronize iterator của synchronizedMap không lock thủ công.
for (var e : synchronizedMap.entrySet()) { ... }
// ConcurrentModificationException neu thread khac modify
✅ Lock map trong iterate:
synchronized (synchronizedMap) {
for (var e : synchronizedMap.entrySet()) { ... }
}
❌ Nhầm 6: Iterate ConcurrentHashMap assume consistent snapshot.
for (var e : map.entrySet()) {
// Weakly consistent - co the thay update xay ra mid-iterate
}
Iterator không throw CME, nhưng có thể thấy (hoặc không thấy) update xảy ra trong lúc iterate. OK cho aggregate không strict; không OK cho invariant phải precise.
10. 📚 Deep Dive Oracle
Spec / reference chính thức:
- java.util.concurrent package — overview toàn bộ concurrent collection.
- ConcurrentHashMap — Java 8 redesign, lock striping, bulk operation.
- CopyOnWriteArrayList — semantics iterator snapshot.
- BlockingQueue — 4-method table, memory consistency.
- "Java Concurrency in Practice" Ch 5 — chapter "Building Blocks" về các collection.
Ghi chú: Javadoc ConcurrentHashMap mô tả memory consistency: "Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread". Đây là guarantee ngầm cho pattern producer-consumer với BlockingQueue — producer ghi data vào object, put vào queue; consumer take, đọc object — không cần thêm sync thủ công.
11. Tóm tắt
HashMapkhông thread-safe — Java 7 có bug cycle trong resize, Java 8+ giảm risk nhưng vẫn corrupt value.Collections.synchronizedMap— an toàn nhưng lock toàn map → contention cao, throughput thấp.ConcurrentHashMap— lock per bucket (Java 8+), atomic method (putIfAbsent,compute,merge). Không cho null key/value.- Atomic method trong
ConcurrentHashMapthay thế check-then-act pattern — 1 op tổng atomic. CopyOnWriteArrayList— write copy toàn array, read no-lock. Dùng cho read-heavy, write hiếm (listener, observer).BlockingQueue— producer-consumer pattern với block/timeout semantic. 4 variant cho mỗi op (throw, special value, block, timeout).LinkedBlockingQueuekhông capacity → unbounded → OOM risk. Production luôn pass capacity.- Bounded queue = back-pressure: producer chậm lại khi consumer chậm, load balance tự động.
- Chọn collection theo use case: map →
ConcurrentHashMap; list read-heavy →CopyOnWriteArrayList; producer-consumer →BlockingQueue.
12. Tự kiểm tra
Q1Vì sao ConcurrentHashMap không cho null value?▸
Trong HashMap, map.get(k) == null ambiguous: không biết key không tồn tại hay value là null. Phải containsKey check — 2 op.
Trong concurrent context, containsKey + get không atomic với nhau. Thread khác có thể xoá key giữa 2 op → check ra true, get ra null → ngộ nhận "key tồn tại với value null".
ConcurrentHashMap tránh hẳn: null not allowed → get trả null chỉ khi key không có. Đơn giản, unambiguous, atomic.
Workaround nếu thực sự cần "không có value": dùng sentinel (Optional.empty(), constant "NULL"), hoặc wrap Optional<V>.
Q2Khi nào CopyOnWriteArrayList phù hợp?▸
Khi write hiếm, read nhiều, và iterator không cần thấy update mới. Typical use case:
- Listener / observer list: subscribe vài lần lúc boot, fire event hàng nghìn lần mỗi giây. Copy O(n) mỗi subscribe không quan trọng vì rare; read zero-lock quan trọng vì hot.
- Config cache: reload 1 giờ 1 lần, read nhiều. Reader thấy snapshot ổn định trong 1 giờ, không cần thấy update ngay.
- Plugin registry: đăng ký lúc boot, iterate mọi request.
Không dùng khi write thường xuyên (log buffer, hit counter) — mỗi add copy toàn array → tệ hại với size lớn. Write 1000 lần vào list 10k element = 10M copy.
Dấu hiệu wrong tool: code có CopyOnWriteArrayList và performance profiler cho thấy hot path là Arrays.copyOf — chuyển sang `Collections.synchronizedList` hoặc refactor design.
Q3Khác biệt giữa put và putIfAbsent?▸
put và putIfAbsent?- put(k, v): luôn ghi đè. Return value cũ (hoặc null nếu chưa có).
- putIfAbsent(k, v): chỉ ghi nếu key chưa có. Return value hiện tại —
nullnếu insert thành công,non-nullnếu key đã có (value cũ giữ nguyên).
Return value của putIfAbsent là điểm dễ nhầm: null = "đã insert mới", non-null = "skip, giữ value cũ".
Integer existing = map.putIfAbsent("k", 1);
if (existing == null) {
// insert moi thanh cong
} else {
// key da co, value hien tai = existing
}Dùng trong pattern "create if not exist" atomic:
User user = map.computeIfAbsent(userId, id -> loadUserFromDb(id));
// Neu key ton tai -> return; khong -> goi function va cache ket qua
// Atomic - dam bao function chi goi 1 lan cho moi keyQ4Đoạn sau có race condition không? if (!chm.containsKey(k)) chm.put(k, v);▸
if (!chm.containsKey(k)) chm.put(k, v);Có race condition, mặc dù ConcurrentHashMap thread-safe.
Thread A check containsKey → false. Thread A chuẩn bị put (chưa put).
Thread B check containsKey → false. Thread B put v2.
Thread A put v1 → overwrite v2.
Mỗi op trong ConcurrentHashMap atomic, nhưng compose 2 op vẫn không atomic — giữa 2 op có khoảng hở.
Fix: dùng atomic method compose sẵn:
// Insert chi neu chua co, atomic
chm.putIfAbsent(k, v);
// Hoac voi value tinh dong
chm.computeIfAbsent(k, key -> compute(key));Quy tắc: nếu logic cần "check X rồi làm Y" mà 2 bước phải atomic, tìm atomic method trong ConcurrentHashMap API (putIfAbsent, computeIfAbsent, computeIfPresent, compute, merge, replace). Đừng compose thủ công.
Q5Vì sao new LinkedBlockingQueue<>() không capacity có thể gây OOM?▸
new LinkedBlockingQueue<>() không capacity có thể gây OOM?Constructor không arg của LinkedBlockingQueue dùng Integer.MAX_VALUE làm capacity — effectively unbounded.
Nếu producer nhanh hơn consumer (network burst, spike traffic), queue grows vô hạn trong memory. Mỗi element = object + linked list node. Với 10 triệu element, heap có thể tăng vài GB → OOM, JVM crash.
Đây là bug production kinh điển: app chạy OK nhiều ngày, đột nhiên OOM khi traffic spike. Root cause: queue unbounded, log aggregator không kịp process.
Production fix:
new LinkedBlockingQueue<>(1000); // Bounded capacity
// hoac
new ArrayBlockingQueue<>(1000); // Array-backed, fixed capacityQueue đầy → producer block tại put → ngầm rate-limit producer → load balance tự nhiên. Pattern bounded queue = back-pressure — core concept của reactive stream.
Q6Khi nào dùng merge và khi nào dùng computeIfAbsent + AtomicInteger cho counter theo key?▸
merge và khi nào dùng computeIfAbsent + AtomicInteger cho counter theo key?- merge(key, 1, Integer::sum): lock bucket, đọc, cộng, ghi lại. Mỗi update lock bucket ngắn.
- computeIfAbsent + AtomicInteger: get-or-create `AtomicInteger`, sau đó `incrementAndGet` dùng CAS — không lock bucket.
So sánh performance:
- Nhiều key, mỗi key update vài lần →
mergeđủ. Code ngắn, dễ đọc. - Ít key hot (1-10 key) với contention cực cao (100+ thread cùng update 1 key) →
AtomicIntegernhanh hơn. CAS không lock OS, ít block hơn lock bucket.
Với contention extreme, dùng LongAdder:
ConcurrentHashMap<String, LongAdder> counts = new ConcurrentHashMap<>();
public void count(String word) {
counts.computeIfAbsent(word, k -> new LongAdder()).increment();
}LongAdder chia counter thành nhiều cell, mỗi thread update cell riêng. Sum cuối. Throughput cực cao dưới contention cao.
Bài tiếp theo: Virtual threads — Java 21 lightweight concurrency
Bài này có giúp bạn hiểu bản chất không?