Java Internals & Concurrency/Blocking Queues & Producer–Consumer: Tách nhịp sản xuất khỏi tiêu thụ
12/39
Bài 12 / 39~13 phútConcurrency cơ bảnMiễn phí lượt xem

Blocking Queues & Producer–Consumer: Tách nhịp sản xuất khỏi tiêu thụ

BlockingQueue và mẫu producer–consumer: put/take, bounded vs unbounded, backpressure, SynchronousQueue và các biến thể, shutdown sạch bằng poison pill.

TL;DR: BlockingQueue là hàng đợi thread-safe biết chờ: take() chặn khi rỗng, put() chặn khi đầy — đúng hai năng lực làm nên xương sống của mẫu producer–consumer. Hàng đợi bounded biến put/offer thành van backpressure: producer nhanh hơn consumer kéo dài sẽ bị ghì lại, thay vì để hàng đợi phình tới OutOfMemoryError. API có bốn kiểu hành xử cho mỗi thao tác — ném exception, trả tín hiệu, chặn vô hạn, chặn có timeout — chọn theo điều bạn muốn khi không làm được ngay. Object đi qua hàng đợi còn được safe publication miễn phí. Shutdown sạch bằng poison pill (N consumer cần N viên) hoặc interrupt. Pitfall lớn nhất: hàng đợi unbounded không xóa giới hạn hệ thống, nó chỉ giấu giới hạn đó đi rồi đổi vài request bị từ chối lấy một cú OOM lúc tải đỉnh.

1. Giới thiệu

Bài trước khép lại ở một ý mà ta nên mang theo: cách đạt thread safety bền nhất thường là đừng tự viết đồng bộ, mà giao việc đó cho một component đã thread-safe. ConcurrentHashMap, CopyOnWriteArrayList, ConcurrentLinkedQueue đều là những building block ta thả vào và dùng, để chúng tự lo phần khó. Nhưng tất cả những collection đó có một điểm chung: khi bạn lấy mà không có gì để lấy, chúng trả về ngay. queue.poll() trên một ConcurrentLinkedQueue rỗng trả về null lập tức, map.get(k) cho một key không tồn tại cũng vậy. Chúng không bao giờ bắt thread phải chờ.

Trong rất nhiều bài toán, chính cái chờ đó mới là thứ ta cần.

Hãy nghĩ về một hệ thống mà một bên liên tục tạo ra việc còn một bên liên tục xử lý việc, hai bên chạy với nhịp khác nhau. Bên tạo việc - producer - đẩy ra request nhanh hơn lúc cao điểm; bên xử lý - consumer - chỉ nuốt được một lượng nhất định mỗi giây. Nếu hai bên gọi thẳng nhau, producer sẽ phải tự hỏi: consumer đã rảnh chưa, nếu chưa thì làm gì, đợi bao lâu, đợi thế nào cho khỏi tốn CPU? Và consumer cũng phải tự hỏi: chưa có việc thì làm gì, lặp bận - busy-wait - thì phí core, mà ngủ thì lỡ việc. Đây chính là loại phối hợp mà nếu tự viết bằng wait/notify, ta sẽ viết sai theo những cách rất khó tìm.

BlockingQueue ra đời để ta không phải tự viết phần đó. Nó là một hàng đợi thread-safe, nhưng thêm đúng một năng lực mà các concurrent collection thường không có: khi rỗng, lấy thì chờ; khi đầy, bỏ vào thì chờ. Cái chờ có quản lý đó chính là xương sống của mẫu producer–consumer, và là cách để tách rời nhịp sản xuất khỏi nhịp tiêu thụ mà vẫn tự lo backpressure.

2. Mẫu Producer–Consumer: vì sao phải tách rời

2.1 Hàng đợi ở giữa khử coupling

Mẫu producer–consumer tách một hoạt động thành hai vai: bên tạo ra công việc và bên thực hiện công việc, nối với nhau bằng một hàng đợi ở giữa. Producer bỏ việc vào hàng đợi rồi quay đi làm tiếp; consumer lấy việc ra khỏi hàng đợi rồi xử lý. Hai bên không gọi trực tiếp nhau, không cần biết nhau, thậm chí không cần tồn tại cùng số lượng.

Phép so sánh đời thường gần nhất là quầy pha chế ở một quán cà phê đông khách. Khách gọi món rồi đưa phiếu vào kẹp order - đó là producer bỏ việc vào hàng đợi. Barista lấy phiếu trên cùng, pha, rồi lấy phiếu tiếp theo - đó là consumer. Người thu ngân không đứng chờ từng ly pha xong mới nhận khách kế tiếp; barista cũng không cần biết ai vừa gọi món. Cái kẹp order ở giữa cho phép hai bên chạy với nhịp riêng. Khi khách dồn vào lúc cao điểm, kẹp order dày lên; khi vắng, nó mỏng đi và barista có lúc đứng chờ phiếu mới. Hệ thống tự cân bằng quanh cái kẹp đó mà không ai phải điều phối thủ công.

Producer
Producer
Queue
BlockingQueue
Consumer
Consumer

Bảng ánh xạ giữa quầy cà phê và pipeline:

Quán cà phêProducer–Consumer
Thu ngân nhận orderProducer tạo task
Kẹp order ở giữaBlockingQueue
Barista lấy phiếu rồi phaConsumer take() rồi xử lý
Kẹp đầy → ngừng nhận kháchQueue đầy → put() chặn (backpressure)
Kẹp rỗng → barista đứng chờQueue rỗng → take() chặn, không đốt CPU

Lợi ích của việc tách rời này không chỉ là cho gọn. Nó làm cho hai bên có thể được suy luận, kiểm thử và điều chỉnh độc lập. Bạn có thể tăng số consumer mà không động vào producer. Bạn có thể thay logic xử lý mà producer không hề hay biết. Và quan trọng nhất với một hệ thống chịu tải, hàng đợi ở giữa trở thành nơi duy nhất để áp đặt giới hạn, thay vì rải kiểm soát tải khắp nơi.

2.2 Khi nhịp hai bên lệch nhau

Điều thú vị xảy ra khi hai bên không cùng tốc độ, mà thực tế thì gần như luôn luôn vậy.

Nếu producer nhanh hơn consumer kéo dài, hàng đợi sẽ dài ra. Câu hỏi sống còn là: dài đến đâu thì dừng? Nếu hàng đợi không có trần, nó cứ phình tới khi nuốt sạch heap và ứng dụng chết vì OutOfMemoryError. Nếu hàng đợi có trần, đến lúc đầy thì producer phải bị chặn lại - và chính cái chặn đó là backpressure: hệ thống tự nói với phía thượng nguồn rằng "chậm lại, tôi chưa theo kịp".

Nếu consumer nhanh hơn producer, hàng đợi sẽ thường xuyên rỗng, và consumer phải chờ việc mới. Cái chờ này cũng cần được làm đúng: consumer không nên quay vòng kiểm tra hàng đợi liên tục để đốt CPU, mà nên ngủ cho tới khi có việc rồi được đánh thức.

Hai tình huống đối xứng đó - chờ khi đầy, chờ khi rỗng - chính là hai năng lực mà BlockingQueue đóng gói sẵn. Tự viết chúng bằng một List cộng wait/notify là một bài tập kinh điển, và cũng là một cái bẫy kinh điển: quên kiểm tra điều kiện trong vòng while, dùng notify thay vì notifyAll, để lọt missed signal. Đến JDK 25, không có lý do gì để tự viết lại phần đó.

3. BlockingQueue API: bốn cặp hành vi

BlockingQueue<E> nằm trong java.util.concurrent, có từ Java 5, mở rộng Queue nhưng thêm các thao tác biết chờ. Điều đáng nhớ về API của nó là mỗi thao tác cơ bản - thêm vào, lấy ra, xem trộm - đều có nhiều biến thể tùy theo bạn muốn nó hành xử thế nào khi không làm được ngay.

Với thao tác thêm vào khi hàng đợi đầy, có bốn lựa chọn. add(e) ném IllegalStateException. offer(e) trả về false để báo thất bại. put(e) chặn cho tới khi có chỗ. offer(e, timeout, unit) chặn nhưng chỉ tới hết thời gian chờ, rồi trả về false. Với thao tác lấy ra khi hàng đợi rỗng, đối xứng tương tự: remove() ném exception, poll() trả về null, take() chặn tới khi có phần tử, poll(timeout, unit) chặn có thời hạn.

Khi không làm được ngayThêm vào (queue đầy)Lấy ra (queue rỗng)
Ném exceptionadd(e)remove()
Trả tín hiệuoffer(e) trả falsepoll() trả null
Chặn vô hạnput(e)take()
Chặn có timeoutoffer(e, timeout, unit)poll(timeout, unit)

Hai cặp đáng quan tâm nhất trong mẫu producer–consumer là put/takeoffer/poll có timeout.

BlockingQueue<Task> queue = new ArrayBlockingQueue<>(1000);

// Producer: chặn nếu queue đầy → backpressure tự nhiên
queue.put(task);

// Consumer: chặn nếu queue rỗng → không busy-wait, ngủ tới khi có việc
Task task = queue.take();

puttake là cặp đơn giản và mạnh nhất. Producer gọi put, và nếu hàng đợi đầy, nó nằm chờ - không tốn CPU - cho tới khi một consumer lấy bớt ra. Consumer gọi take, và nếu hàng đợi rỗng, nó nằm chờ cho tới khi một producer bỏ vào. Toàn bộ phần phối hợp "đầy thì chặn, rỗng thì chặn, có biến động thì đánh thức đúng bên" nằm gọn bên trong hai lời gọi đó. Cả hai đều có thể ném InterruptedException, vì khi đang chặn chúng phản hồi interrupt - một điểm ta sẽ quay lại ở phần shutdown.

Cơ chế bên dưới của cái chờ này không có gì huyền bí - nó chính là pattern hai Condition trên một ReentrantLock mà bài ReentrantLock & Condition đã dựng tay. Mở source ArrayBlockingQueue, bạn sẽ thấy đúng bộ ba quen thuộc: một ReentrantLock canh mảng, một condition notEmpty cho consumer nằm chờ, một condition notFull cho producer nằm chờ. put vào hàng đầy thì notFull.await(); take rút bớt một phần tử thì notFull.signal() đánh thức đúng phía producer, và đối xứng cho chiều ngược lại. BlockingQueue không thay thế kiến thức về lock và condition - nó là kiến thức đó được đóng gói, kiểm chứng và phát hành sẵn để ta khỏi tự viết lại.

Khi chặn vô hạn là quá rủi ro - chẳng hạn một servlet thread không nên nằm chờ mãi vì còn phải trả response cho client - thì dùng biến thể có timeout. offer(task, 100, MILLISECONDS) sẽ cố bỏ vào trong tối đa 100ms; nếu vẫn không có chỗ, nó trả về false và ta chủ động fail-fast thay vì treo. Đây chính là lựa chọn mà capstone dùng ở phần sau.

Một chi tiết hay bị bỏ sót: đặt một object vào BlockingQueue còn lo luôn việc safe publication cho object đó. Bỏ một object vào một thread-safe collection thiết lập quan hệ happens-before giữa lúc producer bỏ vào và lúc consumer lấy ra — chính cái safe publication mà bài Immutability đã dựng nền. Nghĩa là một object được dựng đúng cách, sau khi đi qua hàng đợi, sẽ hiển thị đầy đủ và lành lặn với consumer mà ta không cần đồng bộ thêm gì. Hàng đợi vừa truyền dữ liệu, vừa truyền cả memory visibility.

4. Bounded vs unbounded: nơi backpressure sống hoặc chết

4.1 ArrayBlockingQueueLinkedBlockingQueue

Hai cài đặt thông dụng nhất là ArrayBlockingQueueLinkedBlockingQueue, và sự khác nhau giữa chúng không chỉ là cấu trúc dữ liệu bên trong.

ArrayBlockingQueue được nâng đỡ bởi một mảng có kích thước cố định, quyết định ngay lúc khởi tạo và không bao giờ đổi. Nó luôn bounded.

BlockingQueue<Task> q = new ArrayBlockingQueue<>(1000);   // trần cứng 1000

LinkedBlockingQueue được nâng đỡ bởi các node liên kết. Nó có thể bounded nếu ta truyền capacity, hoặc unbounded nếu ta không truyền - khi đó trần mặc định là Integer.MAX_VALUE, một con số lớn tới mức thực tế coi như vô hạn.

BlockingQueue<Task> bounded   = new LinkedBlockingQueue<>(1000);   // có trần
BlockingQueue<Task> unbounded = new LinkedBlockingQueue<>();       // "vô hạn" — coi chừng

Về cấu trúc, ArrayBlockingQueue cấp phát toàn bộ mảng một lần và dùng một khóa duy nhất cho cả hai đầu, nên đơn giản và có dấu chân bộ nhớ ổn định. LinkedBlockingQueue cấp phát node theo nhu cầu và tách khóa của đầu lấy ra khỏi khóa của đầu bỏ vào, nên dưới tải cao với nhiều producer và nhiều consumer, throughput thường nhỉnh hơn vì hai đầu ít tranh khóa với nhau. Nhưng đó là tối ưu hiệu năng; quyết định quan trọng hơn nằm ở chữ "bounded".

4.2 Bounded nghĩa là có backpressure

Hàng đợi bounded là cái biến put thành một van an toàn. Khi hàng đợi đầy, producer gọi put sẽ bị chặn lại. Producer không thể chạy nhanh hơn consumer kéo dài, vì đến một điểm, chính hàng đợi sẽ ghì nó lại. Đây là backpressure ở dạng tinh khiết nhất: ta không phải viết một dòng logic kiểm soát tải nào, chỉ cần chọn một capacity hợp lý, và cơ chế chặn tự điều tiết toàn hệ thống.

Quay lại quầy cà phê: cái kẹp order chỉ kẹp được một số phiếu nhất định. Khi nó đầy, thu ngân buộc phải ngừng nhận order mới - khách phải đứng đợi ngoài quầy. Khó chịu, nhưng đó chính là điều cứu cả hệ thống: nó ngăn cảnh hàng nghìn phiếu chất đống tới mức barista không bao giờ đuổi kịp và mọi món đều nguội. Trần của cái kẹp là backpressure vật lý.

Nhịp chặn-rồi-đánh-thức đó nhìn theo dòng thời gian như sau:

sequenceDiagram
    participant P as Producer
    participant Q as BlockingQueue cap 3
    participant C as Consumer
    P->>Q: put(task1)
    P->>Q: put(task2)
    P->>Q: put(task3)
    Note over Q: queue DAY 3/3
    P--xQ: put(task4) bi CHAN
    Note over P: producer ngu, khong don CPU
    C->>Q: take() lay task1
    Q-->>P: co cho trong, danh thuc
    P->>Q: put(task4) hoan tat

Producer không cần dòng code nào hỏi "consumer theo kịp chưa". Chính lời gọi put bị chặn là câu trả lời - và khoảnh khắc consumer rút bớt một phần tử, producer được đánh thức để đi tiếp. Toàn bộ vòng điều tiết tải nằm trong ngữ nghĩa của hàng đợi.

4.3 Cái bẫy unbounded: backpressure biến mất, thay bằng OOM

Một LinkedBlockingQueue không capacity nghe có vẻ tiện - không bao giờ phải lo put bị chặn, không bao giờ phải xử lý chuyện hàng đợi đầy. Nhưng chính sự tiện đó là cái bẫy. Khi hàng đợi không có trần, put không bao giờ chặn, nghĩa là backpressure biến mất hoàn toàn. Nếu producer nhanh hơn consumer kéo dài, hàng đợi cứ phình ra mãi.

Hàng đợi đó sống trên heap. Mỗi phần tử chưa xử lý là một object còn reachable, garbage collector không thu được. Hàng đợi phình tới đâu, heap căng tới đó, cho tới khoảnh khắc OutOfMemoryError và toàn bộ ứng dụng sụp. Điều khiến lỗi này độc ác là nó im lặng trong suốt giai đoạn lành mạnh: lúc tải thấp, producer và consumer cân bằng, hàng đợi luôn ngắn, mọi test pass. Lỗi chỉ lộ ra đúng vào lúc tải đạt đỉnh - đúng lúc ta cần hệ thống đứng vững nhất. Một hàng đợi unbounded không loại bỏ giới hạn của hệ thống; nó chỉ giấu giới hạn đó đi, rồi đổi một sự cố nhẹ - vài request bị từ chối - lấy một sự cố nặng - cả tiến trình chết.

Vì vậy, gần như mọi hàng đợi đứng trên đường đi của tải sản xuất nên là bounded. Hỏi "khi đầy thì sao?" ngay từ lúc thiết kế bao giờ cũng tốt hơn để câu hỏi đó tự trả lời bằng một cú crash lúc 2 giờ sáng.

5. Các biến thể: chọn đúng hàng đợi cho đúng bài toán

ArrayBlockingQueueLinkedBlockingQueue là hai con ngựa thồ, nhưng java.util.concurrent còn vài biến thể giải đúng những bài toán riêng. Biết chúng tồn tại để khỏi tự dựng lại.

PriorityBlockingQueue là một hàng đợi không theo thứ tự đến trước phục vụ trước, mà theo độ ưu tiên do Comparator hoặc Comparable quyết định. take luôn trả về phần tử "nhỏ nhất" theo thứ tự đó. Nó dùng khi việc không bình đẳng - một request VIP cần được xử lý trước request thường dù đến sau. Lưu ý nó unbounded về mặt logic (phình theo nhu cầu), nên vẫn phải canh chừng OOM như mọi hàng đợi unbounded.

SynchronousQueue là một trường hợp cực đoan đẹp: nó là hàng đợi không có dung lượng nào cả, kể cả một chỗ. Mỗi put phải chờ cho tới khi có một take tương ứng, và ngược lại. Nó không lưu trữ phần tử mà chỉ làm điểm hẹn để trao tay trực tiếp - direct handoff - từ producer sang consumer. Hình dung như chuyền một món đồ nóng tay này sang tay kia: không có mặt bàn để đặt tạm, hai tay phải gặp nhau. SynchronousQueue chính là cơ chế bên trong của Executors.newCachedThreadPool(): một task chỉ vào được pool khi có ngay một thread sẵn sàng nhận, nếu không pool tạo thread mới.

DelayQueue giữ các phần tử có một thời điểm "chín" - mỗi phần tử cài đặt Delayed để nói khi nào nó sẵn sàng. take chỉ trả về phần tử khi delay của nó đã hết; trước đó, dù hàng đợi không rỗng, take vẫn chờ. Nó hợp cho lập lịch theo thời gian: cache với TTL, retry có backoff, task hẹn giờ.

LinkedTransferQueue (Java 7) là một dạng lai tinh vi hơn, bổ sung method transfer(e): producer có thể chọn chờ cho tới khi một consumer thực sự nhận phần tử của mình, giống SynchronousQueue, nhưng vẫn có thể put không chờ như hàng đợi thường khi không cần đảm bảo đó. Nó là một cài đặt hiệu năng cao dùng thuật toán lock-free ở fast path, hợp khi producer đôi lúc cần biết chắc việc của mình đã có người nhận.

Cuối cùng, BlockingDeque (với cài đặt LinkedBlockingDeque, cả hai có từ Java 6) là hàng đợi chặn hai đầu: bỏ vào và lấy ra được ở cả đầu lẫn cuối. Nó là nền cho mẫu work stealing, nơi mỗi worker lấy việc từ một đầu deque của mình còn worker rảnh thì "trộm" việc từ đầu kia của deque người khác.

Phần lớn thời gian, một ArrayBlockingQueue hay LinkedBlockingQueue bounded là đủ. Các biến thể trên là để khi bài toán có thêm một ràng buộc đặc thù - ưu tiên, handoff, thời gian, hai đầu - ta nhận ra ngay rằng đã có sẵn công cụ, thay vì cố nhồi ràng buộc đó vào một hàng đợi FIFO thường.

6. Shutdown sạch: poison pill và interrupt

Một pipeline producer–consumer chạy được thì dễ; cho nó dừng sạch mới là phần lộ ra tay nghề. Consumer điển hình là một vòng lặp while quanh take, mà take thì chặn vô hạn khi hàng đợi rỗng. Vậy làm sao bảo một consumer đang nằm chờ ở take rằng "hết việc rồi, về đi"?

6.1 Poison pill

Cách kinh điển và dễ suy luận nhất là poison pill: một phần tử đặc biệt, không phải việc thật, mà là tín hiệu "dừng lại". Producer bỏ poison pill vào hàng đợi như một phần tử cuối cùng; consumer khi take ra mà thấy nó thì thoát vòng lặp.

final Task POISON = new Task();   // sentinel, không mang dữ liệu thật

// Consumer
public void run() {
    try {
        while (true) {
            Task t = queue.take();      // chặn tới khi có phần tử
            if (t == POISON) break;     // gặp pill → dừng sạch
            process(t);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

// Producer khi muốn dừng
queue.put(POISON);

Cái hay của poison pill là nó tôn trọng thứ tự hàng đợi: consumer xử lý hết mọi việc thật đã nằm trong hàng đợi trước pill rồi mới dừng, không bỏ sót việc nào đang chờ. Nếu có N consumer, ta cần bỏ vào N viên pill, vì mỗi viên chỉ dừng được đúng một consumer lấy được nó; và producer phải ngừng bỏ việc thật trước khi rải pill, nếu không một viên pill có thể bị một consumer lấy trong khi việc thật vẫn còn phía sau. Poison pill chỉ gọn khi số producer và số consumer đều biết trước; khi số lượng động, nó rối lên nhanh, và lúc đó interrupt là hướng tự nhiên hơn.

6.2 Interrupt khi take đang chặn

Cách thứ hai dùng chính cơ chế interrupt của Java. Vì takeput ném InterruptedException khi thread bị interrupt lúc đang chặn, ta có thể dừng consumer bằng cách interrupt thread của nó: lời gọi take đang nằm chờ sẽ bật dậy bằng một InterruptedException, và consumer dùng đó làm tín hiệu thoát.

Điều quan trọng là xử lý InterruptedException cho đúng, theo đúng nguyên tắc cooperative cancellation đã học ở bài Thread API và vòng đời: hoặc ném tiếp lên trên, hoặc khôi phục cờ interrupt bằng Thread.currentThread().interrupt() trước khi thoát, để tầng trên còn biết rằng thread đã bị yêu cầu dừng. Nuốt im lặng InterruptedException là một trong những lỗi concurrency tệ nhất vì nó làm mất tín hiệu hủy.

Capstone TicketFlow dùng đúng pattern này cho consumer của pipeline:

// BookingWorker — consumer side, capstone v2
@Override
public void run() {
    while (running) {
        FutureTask<BookingResult> task;
        try {
            task = queue.take();                       // chặn tới khi có task
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();        // khôi phục cờ rồi thoát sạch
            return;
        }
        task.run();
    }
}

Worker chạy tới khi cờ running (một volatile boolean) bị tắt, hoặc tới khi thread bị interrupt giữa lúc đang take. Cả hai con đường đều dẫn ra khỏi vòng lặp một cách có trật tự. Lưu ý cờ runningvolatile để thay đổi của thread gọi stop() hiển thị kịp với worker - đúng cái visibility mà bài volatile & synchronized đã nhấn mạnh.

7. Capstone: pipeline đặt vé qua BlockingQueue

Đến đây ta ghép mọi mảnh lại trong capstone. Ở các version trước, một servlet thread gọi thẳng BookingService.book và nằm chờ business logic chạy xong. Cách đó trộn lẫn việc nhận request với việc xử lý request, và không có chỗ nào để áp giới hạn tải. Version v2 tách hai vai đó ra bằng một BlockingQueue bounded ở giữa.

Phía producer là BookingGateway. Servlet thread gọi submit, gateway gói business logic vào một FutureTask, đẩy task vào hàng đợi, rồi trả Future về cho caller. Caller chờ kết quả qua future.get(timeout) - tách hẳn việc gửi request khỏi việc xử lý nó.

💡 Future và FutureTask — định nghĩa nhanh

Future là "phiếu hẹn kết quả": đại diện cho một kết quả sẽ có trong tương lai — caller giữ phiếu, lúc cần thì gọi get() để chờ tới khi xong, hoặc get(timeout, unit) để chờ có thời hạn. FutureTask là một Runnable bọc quanh Callable: worker gọi task.run() để chạy logic, còn kết quả (hoặc exception) được giữ lại bên trong và trao cho bất kỳ ai đang cầm Future. Bài Future & CompletableFuture sẽ mổ kỹ cơ chế này — ở đây chỉ cần hiểu nó là cách tách "gửi việc" khỏi "nhận kết quả".

// BookingGateway — producer side, capstone v2
public Future<BookingResult> submit(BookingRequest req) {
    FutureTask<BookingResult> task = new FutureTask<>(() -> {
        try {
            Booking b = service.book(req.eventId(), req.userId());
            return BookingResult.ok(b);
        } catch (SoldOutException e) {
            return BookingResult.fail("Sold out: " + req.eventId());
        }
    });

    boolean accepted;
    try {
        accepted = queue.offer(task, offerTimeoutMillis, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedException("Interrupted while submitting");
    }
    if (!accepted) {
        throw new RejectedException("Queue full, try again");   // backpressure thành tín hiệu
    }
    return task;
}

Điểm thiết kế đáng chú ý nhất nằm ở chỗ gateway dùng offer có timeout chứ không phải put. Một servlet thread không nên nằm chờ vô hạn vì còn nợ client một response. Nên thay vì put chặn mãi tới khi có chỗ, gateway cho hàng đợi tối đa offerTimeoutMillis để nhận task; nếu hết thời gian mà vẫn đầy, offer trả về false, và gateway biến đúng cái sự cố "đầy" đó thành một RejectedException rõ ràng - một câu trả lời "thử lại sau" gửi ngược về client. Backpressure ở đây không còn là một khái niệm trừu tượng, mà là một status code fail-fast mà phía thượng nguồn nhận được ngay.

Đó chính là khác biệt giữa bounded và unbounded thể hiện thành code thật. Vì hàng đợi có trần, đầy là một trạng thái có thật mà gateway phải xử lý - và nó xử lý bằng cách từ chối lịch sự thay vì để task chất đống. Nếu hàng đợi là unbounded, offer sẽ luôn thành công, RejectedException không bao giờ ném, và hệ thống lặng lẽ trôi về phía OOM dưới tải đỉnh.

Phía consumer là BookingWorker ở mục 6.2: một hoặc nhiều worker thread cùng take từ hàng đợi và chạy FutureTask. Vì nhiều worker cùng rút từ một BlockingQueue thread-safe, ta tăng số worker để nâng throughput mà không động gì tới gateway. BookingService bên trong vẫn giữ invariant "không bán vượt capacity" như các bài trước đã dựng; hàng đợi không thay đổi tính đúng đắn của business logic, nó chỉ định hình lại nhịp mà logic đó được gọi. Và vì FutureTask tự hấp thụ mọi exception của business logic rồi lan ra qua future.get(), một task hỏng không bao giờ giết chết worker - worker cứ lấy task kế tiếp và chạy tiếp.

Toàn cảnh v2 vì thế là một mẫu producer–consumer sách giáo khoa: gateway sản xuất, worker tiêu thụ, một BlockingQueue bounded ở giữa khử coupling và tự lo backpressure, poison pill hoặc interrupt lo phần shutdown. Phần đồng bộ hóa khó nhất - đầy thì chặn, rỗng thì chờ, trao tay an toàn về memory - nằm trọn trong hàng đợi mà ta không viết một dòng wait/notify nào.

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: chính javadoc của BlockingQueue khuyến nghị bounded queue cho production — đọc phần "Usage example" để thấy pattern producer–consumer chuẩn của Oracle.

8. Liên hệ các bài khác

  • Thread API và vòng đời — interrupt và InterruptedException là cơ chế shutdown thứ hai của pipeline; bài đó giải thích vì sao phải khôi phục cờ interrupt thay vì nuốt im lặng.
  • Immutability — safe publication mà hàng đợi cho "miễn phí" được dựng nền ở đó; phần tử immutable đi qua queue là tổ hợp an toàn nhất.
  • ReentrantLock & ConditionArrayBlockingQueue chính là pattern một lock + hai condition (notFull/notEmpty) của bài đó, đóng gói và phát hành sẵn.
  • Synchronizers — họ công cụ điều phối tiến độ không kèm dữ liệu; BlockingQueue thực chất cũng là một synchronizer kiêm vai chứa dữ liệu.
  • Executor & thread pool — thread pool là một producer–consumer hoàn chỉnh: bạn submit (producer), worker thread (consumer), work queue ở giữa — đúng kiến trúc capstone vừa dựng tay.

9. Tổng kết

BlockingQueue là xương sống của mẫu producer–consumer. Nó tách rời nhịp sản xuất khỏi nhịp tiêu thụ bằng một hàng đợi thread-safe biết chờ: lấy khi rỗng thì block, bỏ vào khi đầy thì block. Nhờ đó hai bên chạy với nhịp riêng, được suy luận và mở rộng độc lập, và hệ thống tự cân bằng quanh cái hàng đợi ở giữa.

Những điểm cần mang theo:

  • API có bốn kiểu hành xử cho mỗi thao tác: ném exception (add/remove), trả tín hiệu (offer/poll), chặn vô hạn (put/take), chặn có timeout (offer/poll có thời hạn). Chọn kiểu theo việc bạn muốn làm gì khi không thực hiện được ngay.
  • Bounded là nơi backpressure sống. Một hàng đợi có trần biến put thành van an toàn tự điều tiết tải. Một hàng đợi unbounded không xóa giới hạn của hệ thống, nó chỉ giấu giới hạn đó đi rồi đổi một sự cố nhẹ lấy một cú OOM lúc tải đỉnh. Mặc định nên bounded.
  • Các biến thể giải bài toán riêng: PriorityBlockingQueue cho việc không bình đẳng, SynchronousQueue cho handoff trực tiếp, DelayQueue cho lập lịch theo thời gian, LinkedTransferQueue khi cần biết chắc có người nhận, BlockingDeque cho work stealing.
  • Shutdown sạch dùng poison pill (tôn trọng thứ tự, gọn khi số bên cố định) hoặc interrupt (tự nhiên khi số bên động). Dù cách nào, xử lý InterruptedException cho đúng - ném tiếp hoặc khôi phục cờ, đừng nuốt im lặng.
  • Bỏ object vào hàng đợi cũng lo luôn safe publication: object đi qua hàng đợi sẽ hiển thị lành lặn với consumer mà không cần đồng bộ thêm.

Tới đây ta đã thấy BlockingQueue điều phối dữ liệu giữa producer và consumer. Nhưng còn một họ công cụ khác trong java.util.concurrent không bận tâm tới việc truyền dữ liệu, mà chỉ lo điều phối tiến độ giữa các thread: cho chúng chờ nhau tới khi đủ điều kiện, giới hạn số thread cùng vào một vùng, hay hẹn gặp tất cả tại một điểm chung. Đó là các synchronizer - CountDownLatch, CyclicBarrier, Semaphore và bạn bè - chủ đề của bài kế tiếp. Cũng như blocking queue, chúng là building block có sẵn để ta dùng, không phải thứ ta tự dựng.

10. Tự kiểm tra

Tự kiểm tra
Q1
offer(e, timeout, unit) khác put(e) thế nào, và vì sao gateway phục vụ servlet thread nên chọn offer có timeout?
put chặn vô hạn cho tới khi hàng đợi có chỗ; offer có timeout chỉ chờ tối đa khoảng thời gian cho trước rồi trả về false nếu vẫn đầy. Một servlet thread còn nợ client một response nên không được phép treo vô hạn — nó cần biến trạng thái "hệ thống quá tải" thành một câu trả lời fail-fast ("thử lại sau") trong thời gian có kiểm soát. Với put, lúc tải đỉnh mọi servlet thread sẽ lần lượt nằm chết ở hàng đợi và client chỉ thấy timeout; với offer có timeout, backpressure trở thành một tín hiệu tường minh mà tầng trên xử lý được.
Q2
Vì sao gần như mọi hàng đợi đứng trên đường đi của tải sản xuất nên là bounded? Chuyện gì xảy ra với hàng đợi unbounded khi producer nhanh hơn consumer kéo dài?
Bounded là nơi backpressure sống: khi đầy, put chặn producer lại, hệ thống tự điều tiết quanh cái trần đã chọn. Hàng đợi unbounded làm put không bao giờ chặn, nghĩa là backpressure biến mất — producer nhanh hơn kéo dài thì hàng đợi phình mãi. Mỗi phần tử chưa xử lý là một object còn reachable trên heap, GC không thu được, và đến lúc tải đỉnh thì OutOfMemoryError giết cả tiến trình. Lỗi này độc ác vì im lặng lúc tải thấp: mọi test pass, chỉ production lúc cao điểm mới lộ. Unbounded không xóa giới hạn của hệ thống, nó chỉ giấu giới hạn đó đi.
Q3
Pipeline có 4 consumer cùng rút từ một queue. Vì sao muốn dừng sạch bằng poison pill phải bỏ vào đúng 4 viên, và vì sao producer phải ngừng bỏ việc thật trước khi rải pill?
Mỗi viên pill chỉ bị đúng một consumer take ra — consumer đó thoát vòng lặp và viên pill biến mất khỏi queue, ba consumer còn lại không bao giờ nhìn thấy nó và vẫn nằm chờ ở take() vô hạn. Nên N consumer cần N viên, mỗi người một viên. Còn nếu producer rải pill xong vẫn tiếp tục put việc thật, việc đó sẽ nằm sau pill trong queue: consumer gặp pill là dừng, việc thật phía sau không bao giờ được xử lý. Poison pill chỉ đúng khi nó thực sự là phần tử cuối cùng.
Q4
SynchronousQueue khác gì một ArrayBlockingQueue capacity 1? Nó được dùng ở đâu trong JDK?
ArrayBlockingQueue(1) vẫn có một chỗ chứa: producer put xong là đi tiếp ngay, phần tử nằm chờ trong queue cho tới khi consumer ghé lấy — hai bên không cần gặp nhau. SynchronousQueue không có chỗ chứa nào: mỗi put phải chờ đúng một take tương ứng và ngược lại — một cú trao tay trực tiếp, hai bên buộc phải có mặt cùng lúc. Đây chính là cơ chế bên trong Executors.newCachedThreadPool(): task chỉ vào được khi có ngay một thread rảnh nhận, không thì pool tạo thread mới.
Q5
Producer dựng một object rồi bỏ vào BlockingQueue, consumer lấy ra dùng. Vì sao consumer thấy object lành lặn đầy đủ mà không cần synchronized hay volatile nào thêm?
BlockingQueue (như mọi thread-safe collection chuẩn) bảo đảm safe publication: thao tác bỏ vào và thao tác lấy ra được nối với nhau bằng quan hệ happens-before — mọi ghi mà producer thực hiện trước khi put (bao gồm toàn bộ việc khởi tạo object) đều hiển thị với consumer sau khi take. Không có bảo đảm này, consumer có thể nhìn thấy một object "dựng dở" với field còn giá trị mặc định. Hàng đợi vì thế truyền cả dữ liệu lẫn memory visibility — đúng nền tảng safe publication mà bài Immutability đã dựng.
Q6
Consumer đang nằm chặn ở take(). Làm sao dừng nó không cần poison pill, và vì sao trong catch phải gọi Thread.currentThread().interrupt() trước khi thoát?
Interrupt thread của consumer: take đang chặn sẽ bật dậy bằng InterruptedException, và consumer dùng đó làm tín hiệu thoát vòng lặp — đúng cơ chế cooperative cancellation. Khi bắt exception đó mà không ném tiếp được (vì run() không khai báo throws), ta phải khôi phục cờ interrupt bằng Thread.currentThread().interrupt(), vì JVM đã xóa cờ ngay khi ném exception. Nuốt im lặng đồng nghĩa tầng trên (pool, framework) không bao giờ biết thread đã được yêu cầu dừng — tín hiệu hủy mất hút giữa đường.

Bài tiếp theo: Synchronizers — điều phối tiến độ giữa nhiều thread

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên