Java Internals & Concurrency/Synchronizers: Điều phối tiến độ giữa nhiều thread
13/39
Bài 13 / 39~13 phútConcurrency cơ bảnMiễn phí lượt xem

Synchronizers: Điều phối tiến độ giữa nhiều thread

Điều phối tiến độ giữa nhiều thread: CountDownLatch, CyclicBarrier, Semaphore, Exchanger, Phaser — phân biệt bảo vệ dữ liệu với điều phối tiến độ.

TL;DR: Synchronizer điều phối tiến độ giữa các thread — ai chờ ai, ai được đi — khác với lock/atomic vốn bảo vệ dữ liệu. CountDownLatch cho sự kiện một lần: đếm về 0 rồi mở vĩnh viễn, không reset. CyclicBarrier cho một nhóm cố định hẹn nhau lặp lại tại các mốc, kèm barrier action chạy giữa hai pha; một thành viên hỏng thì cả nhóm cùng nhận BrokenBarrierException. Semaphore giới hạn số lượng truy cập đồng thời bằng permit không có chủ — acquire/release phải đi cặp qua try/finally, kẻo permit rò rỉ vĩnh viễn. Phaser tổng quát hóa cả latch lẫn barrier với số party động. Pitfall lớn nhất: họ method arrive* của Phaser chỉ báo hiệu rồi return ngay, không chờ — muốn chờ phải gọi awaitAdvance.

1. Giới thiệu

Bài trước khép lại ở BlockingQueue - một component mà giá trị thật của nó nằm ở chỗ nó block. Producer gọi put rồi nằm chờ khi hàng đầy, consumer gọi take rồi nằm chờ khi hàng rỗng, và không ai phải tự tay viết một dòng wait/notify nào. Cái queue ấy làm hai việc cùng lúc: nó vừa là nơi chứa dữ liệu an toàn cho nhiều thread, vừa là nơi điều phối tiến độ - quyết định thread nào được đi tiếp, thread nào phải dừng lại đợi.

Hai việc đó thực ra là hai loại bài toán khác nhau, và BlockingQueue chỉ tình cờ gộp cả hai. Suốt từ bài Thread safety tới giờ, ta đã dành phần lớn công sức cho loại thứ nhất: làm sao để nhiều thread cùng chạm vào một mẩu dữ liệu mà không phá vỡ invariant. Đó là câu chuyện bảo vệ data - lock, atomic, immutable, concurrent collection, tất cả đều xoay quanh việc giữ cho shared mutable state luôn nhất quán.

Loại thứ hai thì khác. Đôi khi vấn đề không phải là một mẩu dữ liệu cần được canh gác, mà là nhịp đi của các thread cần được sắp xếp với nhau. Một thread phải đợi đến khi ba thread khác làm xong việc khởi tạo. Năm thread phải cùng có mặt tại một mốc trước khi bất kỳ ai được bước qua. Hệ thống chỉ cho phép mười request đi vào tài nguyên hiếm cùng lúc, request thứ mười một phải xếp hàng. Không có biến dùng chung nào ở đây bị tranh chấp theo nghĩa cổ điển; cái được điều phối là tiến độ - ai chờ ai, ai được đi, ai phải dừng.

Đó chính là việc của synchronizer. Một synchronizer là bất kỳ đối tượng nào điều tiết luồng đi của các thread dựa trên trạng thái của chính nó. BlockingQueue là một synchronizer; nhưng java.util.concurrent còn cả một họ component chuyên trách việc này, và chúng đáng được nhìn riêng ra khỏi câu chuyện bảo vệ data. Bài này đi qua năm cái: CountDownLatch, CyclicBarrier, Semaphore, ExchangerPhaser. Mỗi cái trả lời một câu hỏi điều phối khác nhau, và như mọi thứ trong java.util.concurrent từ Java 5, chúng là component có sẵn, đã được kiểm chứng - không phải thứ ta nên tự dựng lại bằng wait/notify.

2. CountDownLatch: chờ N sự kiện hoàn tất

Hình dung một buổi đua. Tất cả vận động viên đã vào vị trí, nhưng cuộc đua chưa thể bắt đầu chừng nào trọng tài chưa kiểm tra xong từng làn, từng đồng hồ bấm giờ, từng thiết bị. Mỗi hạng mục kiểm tra xong, một ngọn đèn trên bảng tắt đi. Khi ngọn đèn cuối cùng tắt, cổng mở, và tất cả vận động viên cùng xuất phát. Cái bảng đèn đó là một latch: nó giữ một cánh cổng đóng cho tới khi một số điều kiện nhất định đều đã thỏa, rồi mở vĩnh viễn.

CountDownLatch là phiên bản code của cái bảng đèn ấy. Nó được khởi tạo với một số đếm. Thread nào gọi await() sẽ block cho tới khi số đếm về 0; mỗi lần một sự kiện hoàn tất, ai đó gọi countDown() để giảm số đếm đi một. Khi số đếm chạm 0, mọi thread đang chờ ở await() được giải phóng cùng lúc, và mọi lần await() về sau trả về ngay lập tức.

Chữ quan trọng nhất ở đây là "vĩnh viễn". Latch dùng một lần: một khi đã về 0, nó không reset lại được. Nó mô hình hóa một sự kiện chỉ xảy ra một lần trong đời - "khởi tạo đã xong", "tất cả worker đã chạy xong" - chứ không phải một mốc lặp đi lặp lại.

Có hai cách dùng đối xứng nhau. Cách thứ nhất: một (hoặc nhiều) thread chờ đến khi N việc khởi tạo hoàn tất.

public class ServiceBootstrap {
    public void start() throws InterruptedException {
        CountDownLatch ready = new CountDownLatch(3);   // 3 bước warm-up

        startWarmup(ready, this::loadEvents);
        startWarmup(ready, this::primeCache);
        startWarmup(ready, this::verifyDependencies);

        ready.await();                                  // block tới khi cả 3 đếm xong
        System.out.println("Tất cả warm-up xong — mở bán.");
    }

    private void startWarmup(CountDownLatch latch, Runnable step) {
        Thread.startVirtualThread(() -> {
            try {
                step.run();
            } finally {
                latch.countDown();                      // luôn đếm, kể cả khi lỗi
            }
        });
    }
}

Việc đặt countDown() trong khối finally không phải tiểu tiết. Nếu một bước warm-up ném exception mà bỏ qua countDown(), số đếm không bao giờ về 0, và thread gọi await() treo vĩnh viễn. Latch không có khái niệm "thất bại"; nó chỉ đếm. Trách nhiệm đảm bảo nó luôn đếm đủ là của ta.

Cách thứ hai đảo ngược vai trò: dùng latch như một "súng lệnh" để N thread cùng bắt đầu tại một thời điểm. Đây là mẹo kinh điển để viết test đo contention - ta muốn tất cả worker thread đồng loạt lao vào một đoạn code, chứ không để thread đầu chạy xong trước khi thread cuối kịp khởi động.

CountDownLatch startGun = new CountDownLatch(1);
CountDownLatch finished = new CountDownLatch(workerCount);

for (int i = 0; i < workerCount; i++) {
    Thread.startVirtualThread(() -> {
        try {
            startGun.await();        // mọi worker dồn lại đây, cùng chờ phát súng
            doWork();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            finished.countDown();
        }
    });
}
startGun.countDown();                // bắn — tất cả worker xuất phát cùng lúc
finished.await();                    // chờ tất cả xong

Hai latch ở đây làm hai việc tách bạch: startGun (đếm từ 1) là cổng xuất phát, finished (đếm từ workerCount) là vạch đích. Một latch cho "bắt đầu cùng nhau", một latch cho "đợi tất cả về đích". Đó là toàn bộ vốn từ vựng của CountDownLatch, và phần lớn bài toán "chờ một nhóm sự kiện một lần" chỉ cần chừng đó.

3. CyclicBarrier: N thread hẹn gặp nhau tại một điểm

Latch có một sự bất đối xứng tinh tế: thread gọi await() và thread gọi countDown() thường là những thread khác nhau, đóng vai khác nhau. Người chờ không phải người báo tin. CyclicBarrier xóa bỏ sự bất đối xứng đó. Ở đây mọi thread đều bình đẳng: tất cả cùng chạy đến một mốc, cùng gọi await(), và cùng dừng lại đợi nhau cho tới khi người cuối cùng tới nơi. Khoảnh khắc người cuối cùng chạm mốc, rào chắn mở và tất cả cùng đi tiếp.

Hình dung một đoàn leo núi giao hẹn: cứ đến mỗi trạm nghỉ thì cả đoàn phải tụ đủ rồi mới đi tiếp, không ai được bỏ xa đồng đội. Người tới sớm ngồi chờ. Khi người cuối cùng tới, cả đoàn cùng lên đường sang chặng sau. Đến trạm kế tiếp, nghi thức ấy lặp lại y hệt.

Chữ "cyclic" nằm ở chỗ đó, và nó là khác biệt lớn nhất so với latch. Sau khi tất cả vượt qua, barrier tự reset về trạng thái ban đầu và sẵn sàng cho vòng tiếp theo. Đây là công cụ cho các thuật toán lặp theo pha: chia công việc thành nhiều phần cho nhiều thread, chạy một pha, đồng bộ tại barrier để chắc chắn mọi thread đã xong pha này, rồi cùng bước vào pha sau.

public class GridSimulation {
    private final CyclicBarrier barrier;
    private final int workers;

    public GridSimulation(int workers) {
        this.workers = workers;
        // barrier action: chạy đúng 1 lần mỗi khi tất cả tới mốc, trên thread tới cuối cùng
        this.barrier = new CyclicBarrier(workers, this::mergeAndPublishBoard);
    }

    public void run(int steps) {
        for (int w = 0; w < workers; w++) {
            int region = w;
            Thread.startVirtualThread(() -> {
                for (int step = 0; step < steps; step++) {
                    computeRegion(region);          // pha tính toán độc lập từng vùng
                    try {
                        barrier.await();            // chờ mọi vùng tính xong pha này
                    } catch (InterruptedException | BrokenBarrierException e) {
                        return;                     // ai đó hỏng/bị interrupt → rút lui
                    }
                }
            });
        }
    }

    private void computeRegion(int region) { /* ... */ }
    private void mergeAndPublishBoard()    { /* gộp kết quả của cả pha vừa rồi */ }
}

CyclicBarrier còn cho ta một thứ latch không có: barrier action. Đó là một Runnable truyền vào constructor, được chạy đúng một lần mỗi khi rào chắn mở, trên thread tới cuối cùng, và quan trọng là chạy trước khi bất kỳ thread nào được giải phóng. Nó là khe thời gian an toàn để gộp kết quả của pha vừa rồi: tại thời điểm barrier action chạy, mọi thread đã hoàn tất phần việc của pha và chưa ai bước sang pha mới, nên việc đọc gộp kết quả không có ai chen ngang. Ở ví dụ trên, mergeAndPublishBoard hợp nhất kết quả của tất cả các vùng thành trạng thái bàn cờ mới giữa hai pha.

Trong đoạn code có một cái tên trông đáng sợ: BrokenBarrierException. Nó kể đúng bản chất của barrier: một giao ước nhóm, nên nó mong manh theo nhóm. Nếu một thread đang đợi tại barrier bị interrupt, hoặc hết timeout của một await(timeout, unit), hoặc barrier action ném exception, thì barrier rơi vào trạng thái "broken": mọi thread khác đang chờ tại đó lập tức bị đánh thức bằng BrokenBarrierException. Logic ở đây tàn nhẫn nhưng hợp lý - nếu một thành viên không bao giờ tới được mốc, thì cả nhóm không thể đợi mãi; thà cho cả nhóm biết giao ước đã đổ vỡ còn hơn để họ treo vô hạn. Khi đó barrier phải được reset() mới dùng lại được.

Đặt hai vòng đời cạnh nhau, khác biệt một-lần vs lặp-lại hiện rõ:

flowchart TB
    subgraph LATCH["CountDownLatch: dem ve 0 mot chieu"]
        L3["count = 3"] -->|"countDown()"| L2["count = 2"]
        L2 -->|"countDown()"| L1["count = 1"]
        L1 -->|"countDown()"| L0["count = 0: cong mo VINH VIEN"]
        LW["await() block"] -.->|"duoc tha khi ve 0"| L0
    end
    subgraph BAR["CyclicBarrier: hen nhau, lap vong"]
        B1["N thread lan luot await()"] --> B2["nguoi cuoi cham moc"]
        B2 --> B3["chay barrier action"]
        B3 --> B4["ca nhom cung di tiep"]
        B4 -->|"tu reset, vong moi"| B1
    end

Bên trái là một đường thẳng một chiều: đếm cạn rồi đứng mở mãi, await() về sau trả về ngay. Bên phải là một vòng tròn: nhóm chạm mốc, action chạy, rào mở, rồi cả chu trình sẵn sàng cho pha kế tiếp.

Ranh giới chọn giữa hai cái đã rõ. CountDownLatch khi sự kiện xảy ra một lần và người chờ khác người báo. CyclicBarrier khi một nhóm thread cố định phải gặp nhau lặp lại tại các mốc, và ta cần làm gì đó giữa các mốc.

4. Semaphore: giới hạn số thread truy cập đồng thời

Hai synchronizer vừa rồi nói về thời điểm: chờ tới khi, gặp nhau tại. Semaphore nói về số lượng: tại một thời điểm, tối đa bao nhiêu thread được phép đi vào.

Analogy chuẩn xác nhất là một bãi đỗ xe có rào chắn và bảng "CÒN CHỖ / HẾT CHỖ". Bãi có đúng N chỗ. Mỗi xe vào lấy một vé, bảng giảm số chỗ trống đi một; hết chỗ thì rào chắn đóng, xe tới sau xếp hàng ngoài cổng. Mỗi xe ra trả vé, một chỗ được giải phóng, một xe đang chờ được vào. Cái bãi không quan tâm xe nào vào trước xe nào - nó chỉ giữ một bất biến duy nhất: số xe bên trong không bao giờ vượt N.

Semaphore quản lý một tập permit y như tập vé đỗ xe đó. acquire() lấy một permit, block nếu không còn permit nào; release() trả permit về, đánh thức một thread đang chờ. Số permit ban đầu chính là số lượng truy cập đồng thời tối đa.

public class ConnectionThrottle {
    private final Semaphore permits;

    public ConnectionThrottle(int maxConcurrent) {
        this.permits = new Semaphore(maxConcurrent);
    }

    public <T> T withConnection(Callable<T> action) throws Exception {
        permits.acquire();                  // chờ tới khi có chỗ
        try {
            return action.call();
        } finally {
            permits.release();              // luôn trả permit, kể cả khi action ném
        }
    }
}

Cấu trúc acquire() ... try/finally ... release() không phải tùy chọn mà gần như bắt buộc. Nếu action ném exception và ta quên release() trong finally, một permit biến mất vĩnh viễn. Lặp lại lỗi đó vài lần, semaphore cạn sạch permit và mọi thread sau đều block mãi mãi - một dạng resource leak âm thầm, khó truy hơn cả deadlock vì không có vòng chờ rõ ràng nào để soi.

Semaphore có một đặc tính tinh tế dễ bị bỏ qua: nó không gắn permit với thread. Khác với intrinsic lock - vốn ghi nhớ thread nào đang giữ và mang tính reentrant - permit của semaphore không có chủ. Thread A có thể acquire() còn thread B release(); tổng số permit chỉ là một con số đếm, không phải quan hệ sở hữu. Điều đó khiến semaphore linh hoạt hơn lock (dùng được cho mô hình "một bên cấp, bên khác thu") nhưng cũng đồng nghĩa nó không bảo vệ bạn khỏi lỗi đếm sai: gọi release() nhiều hơn acquire() sẽ tăng số permit vượt giá trị khởi tạo, phá vỡ chính cái giới hạn ta dựng nó lên để bảo vệ.

Còn một lựa chọn nữa nằm ngay ở constructor: new Semaphore(permits, true) bật chế độ fair - thread chờ lâu nhất được cấp permit trước, theo đúng thứ tự xếp hàng. Mặc định (non-fair) cho phép thread mới đến "chen ngang" nếu permit vừa được trả đúng lúc, throughput cao hơn nhưng có thể bỏ đói thread chờ lâu dưới tải dồn dập. Với resource pool phục vụ user thật, fair thường đáng cái giá hiệu năng nhỏ đó.

💡 Cách nhớ

Ba synchronizer đầu đều là máy đếm, chỉ khác đếm cái gì: latch đếm sự kiện (về 0 thì mở vĩnh viễn), barrier đếm thread (đủ N thì mở rồi đếm lại), semaphore đếm slot (mượn thì trừ, trả thì cộng).

Một biến thể thường gặp ở front-end là không muốn block khi hết chỗ, mà trả lời ngay "hết slot". tryAcquire() làm đúng việc đó: nó cố lấy một permit và trả về boolean ngay lập tức, không bao giờ chờ. Với một luồng người dùng, thà nói "hết suất giữ chỗ, thử lại sau" còn hơn để trình duyệt treo vài chục giây.

Công dụng kinh điển nhất của Semaphore là biến một collection thường thành một bounded resource pool: khởi tạo semaphore với số permit bằng số tài nguyên, lấy permit trước khi mượn tài nguyên, trả permit sau khi trả tài nguyên. Trường hợp đặc biệt N = 1 cho ta một binary semaphore, dùng được như một mutex - nhưng là một mutex không reentrant và không có chủ, nên trừ khi ta thực sự cần đặc tính "release từ thread khác", một ReentrantLock thường là lựa chọn đúng hơn cho việc loại trừ lẫn nhau thuần túy.

5. Exchanger: trao đổi dữ liệu giữa đúng hai thread

Các synchronizer tới giờ điều phối nhiều thread theo số đếm. Exchanger<T> là một ca rất hẹp: điểm hẹn cho đúng hai thread, nơi hai bên đổi dữ liệu cho nhau. Nó có đúng một method đáng kể - exchange(x): thread gọi sẽ block cho tới khi một thread thứ hai cũng gọi exchange(y) trên cùng đối tượng; lúc đó bên thứ nhất nhận về y, bên thứ hai nhận về x, và cả hai cùng đi tiếp. Hình dung hai điệp viên trao vali ở ga tàu: ai tới trước đứng chờ người kia, và cuộc trao đổi chỉ hoàn tất khi cả hai cùng có mặt - một rendezvous đối xứng hoàn hảo.

Thực dụng mà nói, Exchanger là synchronizer ít gặp nhất trong bộ năm này. Đa số bài toán "đưa dữ liệu từ thread này sang thread kia" được phục vụ tốt và tổng quát hơn bởi BlockingQueuebài trước - vốn không giới hạn ở hai thread và không bắt cả hai phía phải có mặt cùng lúc. Biết nó tồn tại, nhận ra hình dạng bài toán hợp với nó, là đủ.

📚 Deep dive — double buffering với Exchanger

Mẫu kinh điển của Exchanger là producer–consumer dùng double buffering: producer đổ đầy buffer A trong khi consumer rút cạn buffer B, rồi hai bên cùng gọi exchange(buffer) để tráo - producer nhận lại buffer đã rỗng để đổ tiếp, consumer nhận buffer đầy để xử lý. So với đẩy từng phần tử qua BlockingQueue, cách này trao cả khối dữ liệu một lúc và tái dùng chính các buffer, tránh cấp phát liên tục - hợp cho pipeline thông lượng cao, nhạy cảm với áp lực GC. Exchanger chỉ thực sự thắng khi bài toán đối xứng giữa đúng hai bên và ta muốn tái sử dụng vật chứa.

6. Phaser: barrier đa pha với số party động

CountDownLatch đếm một lần rồi thôi. CyclicBarrier lặp lại nhưng số party cố định cứng từ lúc khởi tạo. Cả hai đều cứng nhắc ở một điểm: số lượng "người tham gia" phải biết trước và không đổi. Phaser (từ Java 7) ra đời để gỡ đúng hai hạn chế đó cùng lúc - nó vừa lặp lại theo pha như barrier, vừa cho phép thread gia nhậprời đi giữa chừng.

Hình dung một cuộc họp dài nhiều phiên, nơi người tham dự được phép đến muộn hoặc về sớm. Mỗi phiên chỉ kết thúc khi tất cả những ai đang có mặt đều đã phát biểu xong; nhưng danh sách "đang có mặt" thay đổi giữa các phiên. Phaser quản lý đúng kiểu nhóm co giãn đó: số party không phải hằng số mà là một đại lượng động, đăng ký bằng register() và rút lui bằng arriveAndDeregister().

public class DynamicPipeline {
    private final Phaser phaser = new Phaser(1);   // "1" = thread điều phối tự đăng ký

    public void addWorker(Runnable task, int phases) {
        phaser.register();                          // worker mới gia nhập, party + 1
        Thread.startVirtualThread(() -> {
            try {
                for (int p = 0; p < phases; p++) {
                    task.run();                     // làm phần việc của pha hiện tại
                    phaser.arriveAndAwaitAdvance(); // báo "xong pha", chờ mọi party khác
                }
            } finally {
                phaser.arriveAndDeregister();       // rút khỏi nhóm, party - 1
            }
        });
    }

    public void awaitFirstPhase() {
        // arriveAndDeregister CHI bao "da toi moc va rut lui" roi return NGAY,
        // khong cho ai het. Muon cho, phai goi them awaitAdvance.
        int phase = phaser.arriveAndDeregister();   // tra ve so thu tu pha hien tai
        phaser.awaitAdvance(phase);                 // block toi khi pha do khep lai
    }
}

Method awaitFirstPhase đáng dừng lại vài giây, vì nó phơi ra điểm dễ hiểu lầm nhất của Phaser: API của nó tách đôi thứ mà CyclicBarrier.await() gộp làm một. Họ method arrive* là nửa báo hiệu - đánh dấu "tôi đã tới mốc" rồi đi tiếp ngay, không chặn (riêng arriveAndAwaitAdvance gộp sẵn cả hai nửa). Còn awaitAdvance(phase) là nửa chờ - block cho tới khi pha mang số phase khép lại. arriveAndDeregister() thuộc nửa báo hiệu, nên một method "await" mà thân chỉ gọi nó sẽ return ngay lập tức mà không chờ gì cả - một bug ngữ nghĩa biên dịch vẫn xanh. Cặp đúng là: hứng lấy số pha mà arriveAndDeregister trả về, rồi đưa nó cho awaitAdvance. Lưu ý lời gọi này chờ pha hiện tại khép lại (mọi worker xong pha đầu) chứ không phải cả pipeline; phần còn lại nhóm worker tự đồng bộ với nhau, và phaser tự terminate khi worker cuối cùng arriveAndDeregister đưa sĩ số về 0.

Vốn từ vựng của Phaser ánh xạ khá thẳng sang những gì ta đã biết. arriveAndAwaitAdvance() đúng là CyclicBarrier.await(): báo đã tới mốc rồi chờ cả nhóm. register()arriveAndDeregister() là phần barrier không có - thay đổi sĩ số nhóm tại chỗ. Số đếm kiểu latch cũng tái hiện được, nhưng phải đủ cả hai vế: một Phaser khởi tạo với N party cho N thread arrive() mà không chờ sẽ hành xử như một CountDownLatch đếm một pha - với điều kiện bên chờ gọi awaitAdvance(phase). Phép tương đương nằm ở cặp latch.await()awaitAdvancelatch.countDown()arriveAndDeregister; thiếu vế awaitAdvance, không ai thực sự chờ và phép so sánh sụp đổ. Nói cách khác, Phaser là một thứ tổng quát hơn, gói được cả hành vi của latch lẫn của barrier, đổi lại bằng một API rậm rạp hơn.

Còn một nấc tinh chỉnh nữa: ta có thể can thiệp vào quyết định "khi nào thì pha kết thúc" bằng cách override onAdvance(int phase, int registeredParties). Trả về true từ method này sẽ kết liễu phaser, dùng để diễn đạt điều kiện dừng của một thuật toán lặp - chẳng hạn "chạy tới khi hội tụ" thay vì "chạy đúng K pha". Đó là điểm linh hoạt mà cả latch lẫn barrier đều không cho.

Khi nào nên với tay tới Phaser? Khi số thread tham gia thay đổi trong vòng đời tính toán, hoặc khi điều kiện dừng phụ thuộc vào trạng thái chứ không phải một số vòng cố định. Còn nếu nhóm là cố định và bài toán đơn giản, CountDownLatch hay CyclicBarrier vẫn dễ đọc hơn - đừng trả giá độ phức tạp của Phaser cho một bài toán không cần đến tính động của nó.

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: cả CountDownLatch, Semaphore (và ReentrantLock) đều xây trên cùng một khung AbstractQueuedSynchronizer — cái khung đó đã được mổ ở bài ReadWriteLock, StampedLock & AQS.

7. Tie-in capstone: TicketFlow v2

TicketFlow v2 dùng hai trong số các synchronizer này ở đúng hai vai mà bài học vừa mô tả, và đáng nhìn lại để thấy lý thuyết khớp vào sản phẩm thế nào.

SalesGate là một CountDownLatch dùng làm cổng mở bán một chiều. Hệ thống khởi động cần làm xong một loạt việc warm-up - nạp danh sách sự kiện, làm nóng cache, kiểm tra các dependency - trước khi cho phép request đặt vé đầu tiên đi vào. Latch được khởi tạo với số task warm-up; mỗi task xong gọi warmupTaskDone() (chính là countDown()), và luồng phục vụ chờ ở awaitOpen() (chính là await()). Đúng ngữ nghĩa "chờ N sự kiện hoàn tất, một lần, rồi mở vĩnh viễn": sau khi mở, salesOpen thành true và không có đường quay lại - latch không reset, đúng như bản chất của nó.

HoldLimiterSemaphore đóng vai bounded resource pool, một semaphore per-event. Số permit ban đầu của mỗi sự kiện đặt bằng capacity của nó, nên số lượt giữ chỗ đồng thời cho một sự kiện không bao giờ vượt sức chứa. Đáng chú ý là nó dùng tryAcquire() chứ không acquire(): với luồng người dùng, hết slot thì trả về false ngay để báo "hết suất", thay vì treo trình duyệt. Và đúng như cảnh báo ở mục 4, code ghi rõ rủi ro permit leak - nếu logic quên releaseHold, permit bị giữ vĩnh viễn, nên một sweeper nền cần expire các hold-slot quá hạn. Đó chính là cái bẫy acquire-không-release mà ta đã nói, hiện hình trong một hệ thống thật.

Hai component, hai câu hỏi điều phối khác nhau: latch trả lời "đã đến lúc mở chưa?", semaphore trả lời "còn chỗ để giữ không?". Không cái nào bảo vệ một mẩu dữ liệu - việc đó đã do ConcurrentHashMap và các cơ chế ở các bài trước lo. Chúng điều phối tiến độ, đúng như luận điểm của cả bài.

8. Liên hệ các bài khác

  • Thread API và vòng đời — mọi lời gọi block của synchronizer (await, acquire, exchange, arriveAndAwaitAdvance) đều phản hồi interrupt; bài đó dạy cách xử lý InterruptedException cho đúng quanh chúng.
  • ReentrantLock & Condition — phối hợp "chờ điều kiện rồi được đánh thức" mà ta từng dựng tay bằng lock + condition; synchronizer là tầng đóng gói bên trên cho các hình dạng phối hợp hay gặp nhất.
  • ReadWriteLock, StampedLock & AQSCountDownLatchSemaphore đều là lớp vỏ mỏng quanh AbstractQueuedSynchronizer; đọc lại bài đó để thấy hàng đợi chờ chung bên dưới.
  • Blocking Queues & Producer–ConsumerBlockingQueue cũng là một synchronizer, nhưng kiêm thêm vai chứa dữ liệu; bài này tách riêng phần điều phối tiến độ thuần túy.
  • Executor & thread pool — bài kế tiếp mở Phần B: thôi tự tạo thread, bắt đầu tổ chức công việc thành task.

9. Tổng kết

Suốt nửa series vừa qua, ta tập trung vào một câu hỏi: làm sao giữ cho state luôn đúng khi nhiều thread cùng chạm vào nó. Lock, atomic, immutability, safe publication, concurrent collection, blocking queue - tất cả đều là các biến tấu của việc canh gác shared mutable state. Bài này tách ra một họ công cụ trả lời một câu hỏi khác hẳn: không phải "bảo vệ dữ liệu nào", mà "sắp xếp nhịp đi của các thread ra sao".

SynchronizerCâu hỏi nó trả lờiTái sử dụngChờ bằng gì
CountDownLatch"N sự kiện xong hết chưa?"Không — một chiều, không resetawait() tới khi đếm về 0
CyclicBarrier"Cả nhóm tới mốc chưa?"Tự reset sau mỗi vòngawait() tới khi đủ N party
Semaphore"Còn slot trống không?"Permit quay vòng liên tụcacquire() tới khi có permit
Exchanger"Bên kia tới điểm hẹn chưa?"Mỗi lần ghép một cặpexchange() tới khi đủ hai bên
Phaser"Pha hiện tại khép chưa?"Đa pha, party độngawaitAdvance(phase)

Năm synchronizer ánh xạ vào năm hình dạng bài toán điều phối, và cách phân biệt chúng gọn hơn vẻ ngoài. CountDownLatch cho sự kiện một lần, người chờ khác người báo, không reset. CyclicBarrier cho một nhóm cố định hẹn gặp nhau lặp lại tại các mốc, kèm barrier action chạy giữa các pha. Semaphore cho việc giới hạn số lượng truy cập đồng thời, permit không gắn chủ, lý tưởng cho bounded resource pool. Exchanger cho đúng hai thread tráo dữ liệu tại một rendezvous đối xứng. Phaser cho trường hợp tổng quát nhất - barrier đa pha với số party động và điều kiện dừng tùy biến - gói được cả hành vi của latch lẫn barrier, đổi lại bằng một API nặng nề hơn. Điểm chung xuyên suốt: đây đều là component đã được kiểm chứng trong java.util.concurrent, và việc của ta là chọn đúng cái cho hình dạng bài toán, không phải dựng lại chúng bằng wait/notify.

Đến đây khép lại Phần A của series về concurrency. Bốn chiến lược lớn - giữ state đúng (thread safety, lock, atomic, immutability), tận dụng các building block có sẵn (concurrent collection, blocking queue), tách nhịp sản xuất khỏi tiêu thụ (producer–consumer), và điều phối tiến độ (synchronizer) - cùng nhau trả lời câu hỏi "làm sao để code concurrent đúng".

Nhưng tới giờ ta vẫn còn tự tay tạo thread ở khắp nơi: Thread.startVirtualThread(...) rải trong gần như mọi ví dụ của bài này. Đó là một chi tiết ta sẽ sớm muốn quên đi. Câu hỏi tiếp theo không còn là "làm sao giữ state đúng" nữa, mà là "làm sao tổ chức và chạy công việc concurrent một cách có kỷ luật" - ai tạo thread, tạo bao nhiêu, tái dùng thế nào, dừng ra sao. Đó là địa hạt của Phần B, mở màn bằng Executor và thread pool: ta sẽ thôi nghĩ về thread và bắt đầu nghĩ về task.

10. Tự kiểm tra

Tự kiểm tra
Q1
CountDownLatchCyclicBarrier khác nhau thế nào về khả năng tái sử dụng, và vì sao thiết kế lại khác nhau như vậy?
Latch dùng một lần: số đếm về 0 là cổng mở vĩnh viễn, không reset — vì nó mô hình hóa một sự kiện chỉ xảy ra một lần trong đời ("khởi tạo đã xong"), mọi await() về sau phải trả về ngay mới đúng ngữ nghĩa. Barrier tự reset sau mỗi lần cả nhóm vượt qua — vì nó mô hình hóa một mốc hẹn lặp lại trong thuật toán chạy theo pha. Khác biệt thứ hai nằm ở vai: latch tách người chờ khỏi người báo (await vs countDown), barrier thì mọi thread bình đẳng, cùng gọi await().
Q2
BrokenBarrierException được ném khi nào, và vì sao một thread hỏng lại khiến tất cả thread đang chờ cùng nhận exception?
Barrier vỡ khi một thread đang đợi bị interrupt, một await(timeout, unit) hết hạn, hoặc barrier action ném exception. Khi đó mọi thread khác đang chờ lập tức bị đánh thức bằng BrokenBarrierException. Lý do: barrier là một giao ước nhóm — rào chỉ mở khi đủ N người chạm mốc. Nếu một thành viên không bao giờ tới được, những người còn lại sẽ chờ vô hạn; thà báo cho cả nhóm biết giao ước đã đổ vỡ để họ rút lui có trật tự. Barrier vỡ phải reset() mới dùng lại được.
Q3
Permit của Semaphore "không có chủ" nghĩa là gì? Đặc tính đó cho ta gì và bắt ta tự gánh gì?
Khác intrinsic lock vốn ghi nhớ thread đang giữ (và nhờ đó reentrant), semaphore chỉ giữ một con số đếm permit — không có quan hệ sở hữu nào: thread A acquire() rồi thread B release() là hoàn toàn hợp lệ. Đặc tính này cho ta sự linh hoạt: mô hình "một bên cấp, bên khác thu" như sweeper nền expire các hold quá hạn. Đổi lại, semaphore không bảo vệ ta khỏi lỗi đếm sai — release() nhiều hơn acquire() sẽ đẩy số permit vượt giá trị khởi tạo, lặng lẽ phá vỡ chính cái giới hạn ta dựng nó lên để giữ.
Q4
Vì sao countDown() nên đặt trong khối finally, và lỗi tương ứng phía Semaphore là gì?
Latch không có khái niệm "thất bại" — nó chỉ đếm. Nếu một bước warm-up ném exception mà nhảy qua countDown(), số đếm không bao giờ về 0 và mọi thread đang await() treo vĩnh viễn; finally bảo đảm luôn đếm, kể cả đường lỗi. Phía semaphore, lỗi đối xứng là quên release() trong finally: mỗi exception làm một permit biến mất vĩnh viễn, vài lần là semaphore cạn và mọi thread sau block mãi — một dạng resource leak âm thầm, khó truy hơn deadlock vì không có vòng chờ nào để soi.
Q5
arriveAndAwaitAdvance()arriveAndDeregister() khác nhau thế nào? Vì sao một method tên "await" mà thân chỉ gọi arriveAndDeregister() là bug?
API của Phaser tách đôi thứ mà CyclicBarrier.await() gộp làm một: họ arrive* là nửa báo hiệu — đánh dấu đã tới mốc rồi return ngay; awaitAdvance(phase) là nửa chờ — block tới khi pha đó khép lại. arriveAndAwaitAdvance() gộp sẵn cả hai (báo rồi chờ), còn arriveAndDeregister() chỉ báo và rút khỏi nhóm, không chờ ai. Method "await" mà chỉ gọi nó sẽ return tức thì — bug ngữ nghĩa compile vẫn xanh. Cặp đúng: int phase = phaser.arriveAndDeregister(); phaser.awaitAdvance(phase);.
Q6
Khi nào đáng trả giá độ phức tạp API của Phaser thay vì dùng CountDownLatch hay CyclicBarrier?
Khi một trong hai giả định cứng của latch/barrier bị phá: số thread tham gia thay đổi trong vòng đời tính toán (worker gia nhập bằng register(), rút lui bằng arriveAndDeregister()), hoặc điều kiện dừng phụ thuộc trạng thái thay vì số vòng cố định (override onAdvance trả về true để kết liễu phaser — "chạy tới khi hội tụ"). Nếu nhóm cố định và số pha biết trước, latch/barrier dễ đọc hơn hẳn — đừng trả giá tính động khi bài toán không cần nó.

Bài tiếp theo: Executor Framework — thread pool, lập lịch & shutdown

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên