Java Internals & Concurrency/Executor Framework: Thread pool, lập lịch & shutdown
14/39
Bài 14 / 39~13 phútConcurrency cơ bảnMiễn phí lượt xem

Executor Framework: Thread pool, lập lịch & shutdown

Tách mô tả task khỏi cơ chế chạy task: ExecutorService, ThreadPoolExecutor tuning, scheduled execution, RejectedExecutionHandler, cancellation và shutdown.

TL;DR: Ngừng new Thread() — đóng gói công việc thành task (Runnable/Callable) rồi giao cho ExecutorService, để executor lo thread pool, lập lịch và vòng đời. Bên dưới gần như mọi pool là ThreadPoolExecutor với thuật toán nhận task phản trực giác: tạo core thread → xếp queue → tạo thêm thread → từ chối. Hệ quả: queue unbounded làm maximumPoolSize vô nghĩa. Production cần giới hạn hữu hạn ở mọi chiều — bounded queue cộng RejectedExecutionHandler, trong đó CallerRunsPolicy là backpressure tự nhiên. Task định kỳ dùng ScheduledExecutorService, nhớ bắt mọi exception trong thân task kẻo lịch chết lặng lẽ. Và shutdown theo pattern graceful-trước-forceful-sau, không bao giờ nuốt InterruptedException.

1. Giới thiệu

Suốt phần vừa rồi của series, mọi câu hỏi đều xoay quanh một chữ: state. Làm sao để nhiều thread cùng chạm vào một dữ liệu mà không phá vỡ invariant — bằng confinement, immutability, khóa, concurrent collections, rồi đến synchronizers để điều phối tiến độ giữa các thread thay vì bảo vệ dữ liệu. Bài Synchronizers khép lại bốn chiến lược ấy bằng một nhận xét: ta đã biết cách giữ cho state đúng đắn, nhưng vẫn chưa nói gì về việc ai chạy công việc, trên thread nào, và bao nhiêu thread. Đó là một trục hoàn toàn khác, và nó mở ra phần mới của series — task execution.

Câu hỏi của phần này gọn một cách phản trực giác: một khi đã biết viết code thread-safe, ta nên tổ chức việc chạy nó như thế nào? Và câu trả lời đầu tiên, cũng là luận điểm trung tâm của bài, là ngừng tự tạo thread. Hãy mô tả công việc thành một task, giao nó cho một executor, rồi để executor lo phần thread pool, lập lịch và vòng đời.

Vì sao phải ngừng new Thread()? Ở bài Process và Thread ta đã thấy platform thread không hề miễn phí: mỗi thread tốn bộ nhớ stack, tốn chi phí context switching, gây áp lực lên scheduler của hệ điều hành. Một server đặt vé mà cứ mỗi request lại new Thread(task).start() sẽ chạy ngon trong demo và sụp đổ dưới tải thật. Khi mười nghìn request cùng đến, ta có mười nghìn thread tranh nhau vài chục core; phần lớn thời gian CPU trôi vào việc chuyển ngữ cảnh thay vì làm việc hữu ích, và đến một ngưỡng nào đó JVM ném OutOfMemoryError chỉ vì không cấp nổi stack. Tạo thread thủ công còn trộn lẫn hai thứ vốn nên tách rời: mô tả công việc cần làmchính sách chạy công việc đó. Khi hai mối quan tâm này quấn vào nhau, muốn đổi từ "mỗi task một thread" sang "một pool cố định" là phải sửa khắp nơi.

Executor Framework — có mặt từ Java 5 trong gói java.util.concurrent — chính là cú tách đôi đó. Một bên là task, biểu diễn bằng Runnable hoặc Callable. Bên kia là chính sách thực thi, đóng gói trong một Executor. Ta sẽ đi từ abstraction nhỏ nhất ấy, xuống tới cỗ máy thật bên dưới là ThreadPoolExecutor, qua các factory tiện dụng và những cạm bẫy của chúng, sang lập lịch định kỳ, rồi khép lại bằng phần khó nhất mà phần lớn người viết code bỏ quên: hủy task và tắt pool cho sạch.

2. ExecutorExecutorService

Trái tim của framework là một interface nhỏ đến bất ngờ:

public interface Executor {
    void execute(Runnable command);
}

Chỉ một method. Nó không nói gì về thread, pool hay hàng đợi — nó chỉ nói "đây là một công việc, hãy chạy nó hộ". Sự nghèo nàn ấy là cố ý. Executor tách việc gửi task khỏi việc chạy task. Người gửi không cần biết task sẽ chạy trên thread mới, trên một thread của pool, hay thậm chí ngay trên thread gọi. Mọi quyết định đó nằm bên trong cài đặt cụ thể của Executor.

Phép so sánh đời thường gần nhất là một nhà bếp nhà hàng. Người phục vụ — code gọi — không tự nấu. Họ gắp phiếu order lên kẹp, và đầu bếp nào rảnh sẽ nhận. Người phục vụ không quan tâm món này do đầu bếp nào nấu, hay bếp đang có ba người hay ba mươi người; họ chỉ cần đặt phiếu lên đúng chỗ. execute chính là động tác gắp phiếu lên kẹp.

Executor đủ cho việc "bắn rồi quên", nhưng đời thực cần nhiều hơn: ta muốn biết task đã xong chưa, muốn lấy kết quả nó trả về, muốn hủy nó, và muốn tắt cả hệ thống cho gọn. ExecutorService mở rộng Executor để lo những việc đó. Nó thêm submit, vốn nhận cả Callable<T> lẫn Runnable và trả về một Future<T> — một tay cầm tới kết quả tương lai. Nó thêm invokeAll/invokeAny để gửi cả một mẻ task. Và nó thêm cả một bộ vòng đời: shutdown, shutdownNow, awaitTermination.

ExecutorService pool = Executors.newFixedThreadPool(4);

pool.execute(() -> log.info("fire-and-forget"));          // Runnable, không kết quả

Future<Booking> handle = pool.submit(() ->                // Callable, có kết quả
        bookingService.book("concert-01", "user-42"));

Khác biệt giữa executesubmit không chỉ ở giá trị trả về. Có một cái bẫy tinh vi về exception. Khi task ném exception, execute để nó nổi lên UncaughtExceptionHandler của thread — bạn sẽ thấy stack trace ngoài log. Còn submit nuốt exception vào trong Future; nó chỉ phát lại khi bạn gọi future.get(), gói trong một ExecutionException. Nếu bạn submit một task rồi không bao giờ get, một exception chết người có thể biến mất không dấu vết. Đây là một trong những cách phổ biến nhất khiến lỗi concurrency ẩn mình, và ta sẽ gặp lại nó ở bài Future & CompletableFuture khi mổ xẻ Future.

Từ Java 19 trở đi ExecutorService còn là AutoCloseable, nên có thể dùng trong try-with-resources; lúc đó close sẽ tự gọi shutdown rồi chờ task chạy nốt. Tiện cho code ngắn, nhưng với một pool sống suốt vòng đời ứng dụng, ta vẫn quản lý shutdown thủ công như phần 7 sẽ bàn.

Cái thay đổi ở mục này không phải danh sách method, mà là sự dịch chuyển tư duy: từ giờ ta không nghĩ "tạo thread để làm X" nữa, mà nghĩ "đóng gói X thành task rồi giao cho một service". Còn service đó tổ chức thread thế nào, ta xét tiếp.

3. ThreadPoolExecutor: cỗ máy bên dưới

Hầu hết các ExecutorService ta dùng hằng ngày, dù tạo qua factory nào, đều là một thể hiện của cùng một class: ThreadPoolExecutor. Hiểu nó nghĩa là hiểu gần như mọi thread pool trong JVM. Và cách tốt nhất để hiểu là nhìn vào constructor đầy đủ của nó, vì mỗi tham số là một nút vặn của chính sách thực thi:

new ThreadPoolExecutor(
        int corePoolSize,                 // số thread giữ thường trực
        int maximumPoolSize,              // trần số thread
        long keepAliveTime,               // thời gian sống của thread vượt core khi rảnh
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,// nơi task chờ khi không có thread rảnh
        ThreadFactory threadFactory,      // cách tạo thread
        RejectedExecutionHandler handler);// làm gì khi không nhận nổi task

Bảy tham số ấy quy về một thuật toán quyết định duy nhất, chạy mỗi lần có task mới được submit.

Khi một task đến, pool không hỏi "có thread nào rảnh không". Nó hỏi theo đúng thứ tự sau. Thứ nhất: số thread đang có còn nhỏ hơn corePoolSize? Nếu còn, pool tạo một thread mới để chạy task này — ngay cả khi đang có thread rảnh. Thứ hai, nếu đã đủ core thread: xếp task vào workQueue. Thứ ba, chỉ khi queue đã đầy không nhét thêm được: pool mới tạo thêm thread, cho tới khi chạm maximumPoolSize. Thứ tư, nếu cả queue lẫn maxPool đều kịch trần: task bị từ chối, và RejectedExecutionHandler vào cuộc.

flowchart TD
    A["submit(task)"] --> B{"so thread dang co < corePoolSize?"}
    B -- "chua du core" --> C["tao CORE thread moi chay task<br/>(ke ca khi co thread ranh)"]
    B -- "du core roi" --> D{"workQueue con cho?"}
    D -- "con cho" --> E["xep task vao queue, cho thread ranh"]
    D -- "queue day" --> F{"so thread < maximumPoolSize?"}
    F -- "chua cham tran" --> G["tao them thread (non-core) chay task"]
    F -- "da kich tran" --> H["TU CHOI task<br/>RejectedExecutionHandler vao cuoc"]

Thứ tự "tạo core thread → xếp hàng → tạo thêm thread → từ chối" này phản trực giác đến mức là nguồn của vô số hiểu nhầm. Nhiều người tưởng pool sẽ bung tới maximumPoolSize rồi mới xếp hàng. Thực tế ngược lại: queue được ưu tiên hơn việc tạo thread vượt core. Hệ quả trực tiếp là nếu queue của bạn unbounded, maximumPoolSize trở nên vô nghĩa — queue không bao giờ đầy, nên pool không bao giờ tạo quá corePoolSize thread. Ta sẽ thấy đây chính là cái bẫy của newFixedThreadPool ở mục sau.

keepAliveTime lo phần co lại. Những thread vượt quá corePoolSize mà ngồi không quá keepAliveTime sẽ tự kết thúc để trả tài nguyên về hệ thống, đưa pool về kích thước core khi tải hạ. Hình dung pool như một quán cà phê: corePoolSize là số nhân viên ca cố định luôn có mặt; workQueue là hàng khách đứng chờ; maximumPoolSize là số nhân viên tối đa khi gọi thêm người làm thời vụ giờ cao điểm; và keepAliveTime là khoảng vắng khách đủ lâu để cho người thời vụ về.

ThreadPoolExecutor đáng để dựng tường minh khi ta cần kiểm soát thật sự. Trong capstone TicketFlow v3, booking pool được dựng thẳng từ constructor thay vì qua factory, đúng để chọn từng nút vặn một cách có chủ đích:

new ThreadPoolExecutor(
        workerCount,                          // core = max: pool kích thước cố định
        workerCount,
        0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<>(queueCapacity),       // queue BOUNDED — chủ ý
        new NamedThreadFactory("booking-worker"),      // tên thread cho thread dump
        new ThreadPoolExecutor.CallerRunsPolicy());    // backpressure khi quá tải

Vì sao queue bounded, vì sao đặt tên thread, vì sao CallerRunsPolicy — đó là nội dung hai mục kế tiếp. Trước hết, hãy xem các shortcut mà phần lớn người ta với tới trước, và vì sao chúng nguy hiểm trong production.

4. Executors factory và những cạm bẫy

Class tiện ích Executors cung cấp một loạt factory method tạo sẵn các cấu hình ThreadPoolExecutor thông dụng. Chúng gọn, đọc dễ, và chính sự tiện lợi đó khiến chúng bị lạm dụng ở những nơi không nên dùng.

4.1 Ba factory thông dụng

newFixedThreadPool(n) tạo một pool có đúng n thread thường trực và một queue unbounded. Mọi task vượt quá n thread đang bận sẽ xếp hàng vô tận. newSingleThreadExecutor là trường hợp n = 1: đúng một thread xử lý mọi task theo thứ tự gửi vào — hữu ích khi bạn muốn serial execution mà vẫn hưởng cơ chế task/Future, một dạng confinement bằng cách gom mọi việc về một thread duy nhất. newCachedThreadPool thì ngược cực: corePoolSize bằng 0, maximumPoolSize bằng Integer.MAX_VALUE, và một SynchronousQueue không chứa được phần tử nào. Vì queue không bao giờ "nhận" task, mỗi task đến mà không gặp thread rảnh sẽ làm pool tạo thread mới ngay; thread rảnh quá 60 giây thì bị thu hồi.

Trên giấy, cached pool tự co giãn theo tải nghe rất hấp dẫn. Vấn đề lộ ra dưới tải thật.

4.2 Vì sao tránh newCachedThreadPool và unbounded queue trong production

Hai factory này hỏng theo hai cách đối xứng nhau, và cả hai đều bắt nguồn từ việc không có trần.

newCachedThreadPool không giới hạn số thread. Khi request đến nhanh hơn tốc độ xử lý — đúng kịch bản tải đỉnh mà ta cần phòng — pool cứ tạo thêm thread, mỗi thread một stack. Không có gì chặn nó tạo tới hàng nghìn, hàng vạn thread, cho tới khi JVM kiệt bộ nhớ và sập. Nói cách khác, cached pool biến một cơn tăng tải thành một sự cố OutOfMemoryError.

newFixedThreadPool lại hỏng âm thầm hơn, nên nguy hiểm hơn. Số thread bị chặn cứng, tốt; nhưng queue thì unbounded. Khi tải vượt năng lực xử lý, task không bị từ chối — chúng chất đống vô hạn trong queue. Bộ nhớ phình lên theo từng task chờ, latency của những task cuối hàng kéo dài tới mức vô nghĩa, và hệ thống không hề báo cho bạn biết nó đang quá tải. Nó cứ nhận thêm việc cho tới khi đổ. Một unbounded queue về bản chất là một cơ chế từ chối thừa nhận giới hạn của chính mình.

Bài học chung là: production cần một giới hạn hữu hạn ở mọi chiều. Hoặc chặn số thread, hoặc chặn độ dài queue, lý tưởng là cả hai — rồi quyết định một cách có ý thức điều gì xảy ra khi chạm trần, thay vì để hệ thống tự âm thầm tích nợ tới lúc vỡ. Đó chính là lý do TicketFlow dựng ThreadPoolExecutor tay với ArrayBlockingQueue bounded: khi cả thread lẫn queue đều đầy, pool từ chối task, và việc từ chối ấy là một tín hiệu backpressure trung thực gửi ngược về phía gọi.

4.3 Sizing pool: CPU-bound khác I/O-bound

Còn một câu hỏi chưa trả lời: n nên là bao nhiêu? Không có con số thần kỳ, nhưng có một nguyên lý, và nó phụ thuộc vào bản chất của task.

Với task CPU-bound — mã hóa, nén, tính toán thuần, gần như không chờ gì bên ngoài — mỗi thread chiếm trọn một core trong lúc chạy. Thêm thread vượt quá số core không làm việc nhanh hơn; nó chỉ tăng chi phí context switching. Điểm xuất phát hợp lý là số thread xấp xỉ số core, thường là Runtime.getRuntime().availableProcessors() cộng một để bù những khoảng khựng hiếm hoi.

Với task I/O-bound — gọi database, gọi remote API, đọc file — câu chuyện đảo ngược. Phần lớn thời gian thread chỉ ngồi chờ, không dùng CPU. Nếu chỉ chạy số thread bằng số core, các core sẽ rảnh rỗi trong lúc tất cả thread đang treo trên một lời gọi mạng. Để giữ CPU bận, ta cần nhiều thread hơn số core, tỉ lệ thuận với phần thời gian thread dành để chờ. Công thức tham khảo của Brian Goetz trong Java Concurrency in Practice (§8.2): N = Ncpu × Ucpu × (1 + W/C) — số thread tối ưu xấp xỉ số core (Ncpu) nhân mức sử dụng CPU mục tiêu (Ucpu) nhân với (1 cộng tỉ số thời-gian-chờ trên thời-gian-tính, W/C). Đừng nhầm với Little's Law (L = λ × W) — định luật lý thuyết hàng đợi nói số phần tử trung bình trong hệ bằng thông lượng nhân thời gian lưu trú; nó hữu ích để ước lượng độ dài queue hay số request đồng thời, nhưng không phải công thức sizing thread này. Một task dành 90% thời gian chờ I/O có thể cần gấp mười lần số core mới bão hòa được CPU.

Chính cái căng thẳng "I/O-bound cần rất nhiều thread, nhưng platform thread thì đắt" là động lực dẫn tới virtual thread đã nhắc ở bài đầu — Executors.newVirtualThreadPerTaskExecutor() cho phép một thread mỗi task mà không trả giá đắt. Nhưng đó là chủ đề riêng của bài Virtual Threads gần cuối series; ở đây ta vẫn ở trong thế giới platform thread, nơi sizing là một quyết định thực sự. Một hệ quả thực dụng rút ra từ phần này: đừng nhét task blocking I/O và task CPU-bound vào cùng một pool. Một mẻ lời gọi mạng chậm có thể chiếm hết thread và bỏ đói các tính toán ngắn. Tách pool theo loại workload — đúng như TicketFlow tách booking pool khỏi notify pool — để một loại tải không bóp nghẹt loại kia.

5. Lập lịch với ScheduledExecutorService

Tới giờ mọi task đều chạy ngay khi có thể. Nhiều hệ thống cần thứ khác: chạy sau một khoảng trễ, hoặc lặp lại đều đặn — dọn cache hết hạn, gửi heartbeat, hết hạn các chỗ giữ vé. Phản xạ cũ là TimerTimerTask, nhưng Timer có hai khuyết tật chí mạng: nó chỉ dùng một thread cho mọi task nên một task chạy lâu làm trễ tất cả task sau, và một exception không bắt trong một TimerTask sẽ giết luôn thread Timer, khiến mọi lịch còn lại im lặng ngừng chạy. ScheduledExecutorService thay thế nó hoàn toàn.

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

scheduler.schedule(() -> sendReminder(),
        10, TimeUnit.MINUTES);                       // chạy MỘT lần sau 10 phút

scheduler.scheduleAtFixedRate(() -> emitHeartbeat(),
        0, 30, TimeUnit.SECONDS);                    // mỗi 30s tính từ lúc BẮT ĐẦU lần trước

scheduler.scheduleWithFixedDelay(() -> sweepExpired(),
        1, 1, TimeUnit.MINUTES);                     // 1 phút SAU KHI lần trước KẾT THÚC

Hai biến thể lặp này dễ chọn nhầm cho nhau. scheduleAtFixedRate đo nhịp từ thời điểm bắt đầu của mỗi lần chạy: lần thứ k được hẹn vào mốc initialDelay + k*period bất kể lần trước chạy bao lâu. scheduleWithFixedDelay đo từ thời điểm kết thúc: nó chờ đúng delay sau khi lần trước xong mới khởi động lần kế.

Sự khác biệt vô hại khi task luôn chạy nhanh hơn period, nhưng hóa thành tai họa khi task chạy chậm hơn. Với fixedRate, nếu mỗi lần sweep mất 90 giây mà period chỉ 30 giây, các lần chạy dồn cục: scheduler muốn giữ nhịp nên xếp các lần trễ chạy nối đuôi không nghỉ, biến công việc nền nhẹ nhàng thành một chuỗi chạy liên miên đè lên pool. Với fixedDelay, dù task chậm bao nhiêu, luôn có đúng delay giây nghỉ giữa hai lần — nhịp giãn ra theo, nhưng không bao giờ dồn. Quy tắc thực dụng: cần tần suất đều (như lấy mẫu metric theo mốc thời gian) thì dùng fixedRate; cần khoảng nghỉ đều để tránh chồng lấn (như dọn dẹp định kỳ) thì dùng fixedDelay.

TicketFlow v3 chọn fixedDelay cho sweeper hết hạn chỗ giữ, đúng vì lý do tránh dồn:

handle = scheduler.scheduleWithFixedDelay(
        this::sweepOnce, ms, ms, TimeUnit.MILLISECONDS);

private void sweepOnce() {
    try {
        int expired = limiter.expireOldHolds();
        // ...
    } catch (Exception e) {            // BẮT MỌI exception ngay trong task body
        sweepErrors.incrementAndGet();
    }
}

Cái try/catch ôm trọn thân task không phải để cho đẹp. ScheduledExecutorService thừa hưởng một hành vi khắc nghiệt: nếu một task định kỳ ném exception mà không bị bắt, lần chạy đó không chỉ thất bại — toàn bộ lịch của task ấy bị hủy vĩnh viễn, lặng lẽ, không log, không lỗi nổi lên. Một lần expireOldHolds ném NullPointerException vì dữ liệu lạ là đủ để sweeper ngừng chạy mãi mãi, và bạn chỉ phát hiện khi vé hết hạn chất đống nhiều ngày sau. Vì thế luật cứng với mọi periodic task là: bắt mọi exception ngay bên trong thân task để lần chạy lỗi không kéo đổ cả lịch.

Một lưu ý phiên bản nhỏ về cài đặt: bản thân ScheduledThreadPoolExecutor luôn dùng một hàng đợi unbounded có thứ tự thời gian và bỏ qua maximumPoolSize — nó chỉ co giãn theo corePoolSize. Điều đó hợp lý cho task định kỳ thưa, nhưng nghĩa là đừng trông cậy nó tự bung thread khi task dồn; hãy giữ task lập lịch ngắn gọn.

6. ThreadFactoryRejectedExecutionHandler

Hai tham số cuối của constructor ThreadPoolExecutor thường bị bỏ mặc về mặc định, nhưng trong hệ thống production chúng là khác biệt giữa một pool dễ vận hành và một pool mù mịt khi sự cố.

ThreadFactory quyết định cách pool tạo thread. Mặc định, các thread của pool mang tên kiểu pool-3-thread-1 — vô nghĩa khi bạn nhìn một thread dump lúc 3 giờ sáng và cố hiểu thread nào đang treo. Một ThreadFactory tự viết cho phép đặt tên có ý nghĩa, đặt daemon flag, gắn UncaughtExceptionHandler, hay set priority. Lợi ích lớn nhất, rẻ nhất, là cái tên:

public class NamedThreadFactory implements ThreadFactory {
    private final String namePrefix;
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + "-" + counter.incrementAndGet());
        t.setDaemon(false);
        return t;
    }
}

Khi TicketFlow đặt prefix "booking-worker", thread dump hiện ra booking-worker-1, booking-worker-2, và việc đọc một sự cố trở nên khả thi: bạn biết ngay đống thread đang chờ database là của booking pool chứ không phải report pool. AtomicInteger cho bộ đếm là vì newThread có thể bị gọi đồng thời — chính class quản lý thread cũng phải thread-safe.

RejectedExecutionHandler lo điều xảy ra khi pool không nhận nổi task nữa — khi cả maxPool lẫn bounded queue đều đầy, hoặc khi pool đã shutdown. Đây chính là tình huống mà unbounded queue cố tình lảng tránh; một khi bạn đã chọn bounded để có giới hạn trung thực, bạn phải quyết định từ chối nghĩa là gì. JDK cung cấp bốn chính sách sẵn. AbortPolicy — mặc định — ném RejectedExecutionException, để phía gọi tự xử. DiscardPolicy lặng lẽ vứt task mới, nguy hiểm vì mất việc không dấu vết. DiscardOldestPolicy vứt task chờ lâu nhất trong queue để nhường chỗ cho task mới. Và CallerRunsPolicy — đáng chú ý nhất — bắt chính thread gọi submit chạy luôn task đó.

CallerRunsPolicy là một cơ chế backpressure thanh lịch. Khi pool quá tải, thread gọi (thường là thread nhận request) bị giữ lại để tự chạy task, nên trong khoảng thời gian đó nó không thể nhận thêm request mới. Áp lực tự nhiên truyền ngược lên tầng trên — tới mức acceptor của server, rồi tới client — làm chậm tốc độ đổ task vào đúng bằng tốc độ pool tiêu thụ, thay vì tích nợ hay ném lỗi tới tấp. TicketFlow chọn CallerRunsPolicy cho booking pool chính vì lẽ đó: dưới tải đỉnh, hệ thống chậm lại một cách có kiểm soát thay vì sụp hay đánh rơi vé. Khi cần một thông điệp từ chối tường minh hơn cho nghiệp vụ, ta có thể viết handler riêng ném một exception miền như RejectedException để tầng trên dịch thành phản hồi "hệ thống đang bận, thử lại sau".

7. Cancellation và shutdown

Tạo pool thì dễ; dừng nó cho sạch mới là phần phân biệt code demo với code production. Một pool tắt cẩu thả có thể đánh rơi task đang dở, treo JVM vì thread không chịu chết, hoặc làm hỏng dữ liệu vì cắt ngang một thao tác giữa chừng. Phần này là phần nhiều người bỏ quên nhất, và cũng là phần outline bài này dành trọn vẹn capstone tie-in.

7.1 Hủy một task: interruption và Future.cancel

Java không có cách nào ép một thread dừng ngay lập tức một cách an toàn — Thread.stop đã bị khai tử từ lâu vì nó có thể bỏ object ở trạng thái hỏng. Cơ chế hủy của Java là hợp tác: ta yêu cầu một thread dừng, và thread phải chủ động kiểm tra yêu cầu đó rồi tự thu xếp dừng. Phương tiện của yêu cầu ấy là interruption.

Khi bạn gọi future.cancel(true), executor sẽ interrupt() thread đang chạy task. Một interrupt không tự dừng task; nó chỉ bật một cờ trên thread và làm các method blocking như sleep, wait, take ném InterruptedException. Task có tôn trọng yêu cầu hay không là tùy ở cách nó được viết. Một task tính toán dài cần định kỳ tự hỏi Thread.currentThread().isInterrupted() và thoát khi thấy cờ bật. Một task chặn trên I/O cần bắt InterruptedException và dừng cho gọn.

Có một luật về interruption mà vi phạm sẽ phá hỏng khả năng hủy của cả hệ thống: đừng bao giờ nuốt InterruptedException. Khi bắt nó mà không thể truyền tiếp, bạn phải khôi phục cờ interrupt bằng Thread.currentThread().interrupt(), để tầng code phía trên còn biết rằng đã có yêu cầu hủy. Nuốt im lặng nó nghĩa là một yêu cầu dừng bị xóa sổ, và pool có thể không bao giờ tắt được. TicketFlow tuân đúng luật này trong code shutdown ở dưới: nhánh catch (InterruptedException e) gọi lại Thread.currentThread().interrupt() để phục hồi cờ.

future.cancel(false) là biến thể nhẹ hơn: nó ngăn task chưa chạy khởi động, nhưng không interrupt task đang chạy. Dùng nó khi bạn muốn task đang dở chạy nốt cho trọn. Sweeper của TicketFlow gọi handle.cancel(false) khi đóng — nếu một lần sweep đang chạy, để nó hoàn tất rồi mới thôi, thay vì cắt ngang giữa lúc giải phóng chỗ giữ.

7.2 shutdownshutdownNow

Tắt cả một ExecutorService có hai mức, phản ánh đúng hai mức quyết liệt khác nhau.

shutdown() là cuộc chia tay lịch sự. Pool ngừng nhận task mới — mọi submit sau đó bị từ chối — nhưng mọi task đã nhận, kể cả task còn nằm trong queue, vẫn được chạy cho xong. Đây là cách tắt mặc định cho một lần dừng có trật tự: không ai bị bỏ rơi, không việc nào dở dang.

shutdownNow() là cuộc chia tay gấp gáp. Nó cố dừng càng nhanh càng tốt: interrupt() mọi thread đang chạy, bỏ qua các task còn chờ trong queue, và trả về danh sách những task chưa kịp khởi động đó để bạn tự định đoạt. Vì nó dựa trên interruption, nó chỉ thực sự dừng được những task biết tôn trọng interrupt — một task lờ cờ interrupt vẫn sẽ chạy bất chấp. Cả hai method đều trả về ngay, không chờ; muốn biết pool đã thực sự dừng hẳn chưa, phải hỏi tiếp bằng awaitTermination.

7.3 Pattern shutdown chuẩn: graceful trước, forceful sau

Trong thực tế, cách tắt đúng đắn là phối hợp cả hai: xin lịch sự trước, hết kiên nhẫn thì ép. TicketFlow đóng gói chính xác pattern này trong LifecycleManager:

static void shutdownAndAwait(String name, ExecutorService executor,
                             Duration grace, Duration force) {
    executor.shutdown();                                   // 1. ngừng nhận, để task chạy nốt
    try {
        if (!executor.awaitTermination(grace.toMillis(), TimeUnit.MILLISECONDS)) {
            executor.shutdownNow();                        // 2. quá hạn ân huệ → ép dừng
            if (!executor.awaitTermination(force.toMillis(), TimeUnit.MILLISECONDS)) {
                // log: pool không phản hồi interrupt
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();                // 3. khôi phục cờ interrupt
    }
}

Trình tự ấy đọc như một câu chuyện. Trước hết shutdown() cho task đang dở một cửa sổ ân huệ để hoàn tất tử tế. awaitTermination(grace) chặn cho tới khi hoặc mọi task xong, hoặc hết thời gian chờ. Nếu hết giờ mà vẫn còn task ngoan cố, shutdownNow() interrupt chúng và một awaitTermination thứ hai cho chúng cơ hội cuối để phản ứng với interrupt. Còn nếu bản thân thread đang chạy shutdown bị interrupt giữa chừng, ta ép dừng ngay rồi khôi phục cờ — đúng luật mục 7.1.

Còn một chi tiết về thứ tự đóng nhiều pool dễ bị xem nhẹ. LifecycleManager đóng các pool theo đúng thứ tự trong danh sách, và thứ tự ấy có chủ đích: đóng nguồn sinh task trước, nơi tiêu thụ task sau. Với TicketFlow nghĩa là dừng sweeper trước — để nó thôi đẩy việc giải phóng chỗ vào hệ thống — rồi mới đóng booking pool. Đảo ngược thứ tự sẽ dẫn tới cảnh một pool đã đóng còn bị một pool khác cố submit việc vào, sinh RejectedExecutionException ngay trong lúc đang tắt. Đăng ký toàn bộ chuỗi này vào một JVM shutdown hook để hệ thống tự dọn mình khi nhận tín hiệu dừng, và vòng đời của pool khép lại sạch sẽ thay vì bị JVM giật dây điện đột ngột.

8. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

  • ThreadPoolExecutor javadoc (Java 21) — phần đầu javadoc mô tả chính xác thuật toán core/queue/max và từng nút vặn; đây là nguồn gốc của sơ đồ mục 3.
  • ScheduledThreadPoolExecutor javadoc — ghi rõ hành vi "task ném exception thì các lần chạy sau bị suppress" của lịch định kỳ.
  • Java Concurrency in Practice (Goetz et al.), chương 6 (Task Execution) và §8.2 (Sizing thread pools) — nguồn của công thức N = Ncpu × Ucpu × (1 + W/C).

Ghi chú: javadoc của ThreadPoolExecutor là một trong những javadoc đáng đọc trọn nhất trong JDK — gần như mọi câu hỏi tuning ("vì sao pool không bung tới max", "prestartCoreThread làm gì") đều có câu trả lời trực tiếp ở đó.

9. Tổng kết

Bài này chỉ xoay quanh một sự dịch chuyển trách nhiệm. Trước đây ta tự new Thread(), tự quyết bao nhiêu thread, tự lo lúc nào dừng. Executor Framework rút toàn bộ phần "chính sách thực thi" ấy ra khỏi code nghiệp vụ và gói vào một abstraction: ta mô tả công việc thành task, giao cho ExecutorService, và nó lo phần còn lại.

Phần còn lại, hóa ra, có chiều sâu đáng kể. Dưới mọi factory tiện dụng là một ThreadPoolExecutor với thuật toán quyết định "tạo core thread → xếp hàng → tạo thêm thread → từ chối" mà hiểu sai sẽ dẫn tới những cấu hình sập dưới tải. Các shortcut của Executors đánh đổi an toàn lấy gọn gàng: newCachedThreadPool không chặn số thread, newFixedThreadPool không chặn độ dài queue, và production cần một giới hạn hữu hạn ở mọi chiều. Sizing pool tách bạch CPU-bound với I/O-bound. ScheduledExecutorService lo task định kỳ, với cặp fixedRate/fixedDelay cần chọn đúng và luật cứng phải bắt mọi exception trong thân task. ThreadFactory đặt tên cho thread để vận hành được, RejectedExecutionHandler biến giới hạn thành backpressure trung thực. Và shutdown — phần hay bị bỏ quên nhất — đòi một pattern graceful-trước-forceful-sau, dựa trên cơ chế hủy hợp tác qua interruption mà ta không được phép nuốt.

Nhưng để ý xuyên suốt bài, có một câu hỏi ta liên tục né. Khi submit một Callable, ta nhận về một Future — rồi sao nữa? Làm sao thực sự lấy kết quả task trả về, mà không chặn cứng một thread chỉ để ngồi chờ? Và nếu một booking thành công cần kéo theo một loạt bước phụ thuộc nhau — xác nhận chỗ, rồi mới gửi thông báo, rồi mới ghi nhận — làm sao ghép chúng thành một chuỗi mà không lồng get vào get rối như tơ vò? Future.get là blocking và không compose được; đó chính là giới hạn mà bài kế tiếp gỡ, khi ta nâng từ Future lên CompletableFuture và biến những task rời rạc thành một pipeline async thật sự.

10. Tự kiểm tra

Tự kiểm tra
Q1
Vì sao khi workQueue là unbounded thì maximumPoolSize trở nên vô nghĩa?

Thuật toán nhận task của ThreadPoolExecutor chỉ tạo thread vượt quá corePoolSize khi queue đã đầy — thứ tự là: tạo core thread, rồi xếp queue, rồi mới tạo thêm thread. Một queue unbounded (như LinkedBlockingQueue mặc định của newFixedThreadPool) không bao giờ đầy, nên nhánh "tạo thêm thread" không bao giờ được chạm tới.

Hệ quả là pool mãi mãi chỉ có corePoolSize thread, còn task thừa chất đống vô hạn trong queue — bộ nhớ phình, latency của task cuối hàng kéo dài, và hệ thống không phát tín hiệu quá tải nào cho tới khi đổ.

Q2
scheduleAtFixedRate và scheduleWithFixedDelay khác nhau thế nào, và điều gì xảy ra khi task chạy lâu hơn period?

scheduleAtFixedRate đo nhịp từ thời điểm bắt đầu mỗi lần chạy: lần thứ k được hẹn vào mốc initialDelay + k*period, bất kể lần trước chạy bao lâu. scheduleWithFixedDelay đo từ thời điểm kết thúc: chờ đúng delay sau khi lần trước xong mới khởi động lần kế.

Khi task chậm hơn period, fixedRate dồn cục: scheduler cố giữ nhịp nên các lần trễ chạy nối đuôi không nghỉ, đè lên pool. fixedDelay luôn giữ khoảng nghỉ cố định nên không bao giờ dồn — nhịp chỉ giãn ra. Cần tần suất đều (lấy mẫu metric) thì dùng fixedRate; cần khoảng nghỉ để tránh chồng lấn (dọn dẹp định kỳ) thì dùng fixedDelay.

Q3
Vì sao CallerRunsPolicy được coi là cơ chế backpressure tự nhiên?

Khi pool kịch trần, CallerRunsPolicy bắt chính thread gọi submit — thường là thread nhận request — phải tự chạy task bị từ chối. Trong lúc bận chạy task đó, thread này không thể nhận thêm request mới.

Áp lực vì thế tự truyền ngược lên tầng trên: acceptor của server chậm lại, rồi tới client. Tốc độ đổ task vào tự co về đúng bằng tốc độ pool tiêu thụ — hệ thống chậm lại có kiểm soát thay vì tích nợ trong queue (unbounded) hay ném lỗi hàng loạt (AbortPolicy).

Q4
Một task định kỳ trên ScheduledExecutorService ném exception không bị bắt — chuyện gì xảy ra với các lần chạy sau, và phòng thế nào?

Toàn bộ lịch của task ấy bị hủy vĩnh viễn, lặng lẽ — không log, không lỗi nổi lên. Đây là contract của scheduleAtFixedRate/scheduleWithFixedDelay: một lần chạy ném exception là chuỗi lặp chấm dứt. Bạn thường chỉ phát hiện nhiều ngày sau, khi hậu quả nghiệp vụ (vé hết hạn không được dọn) chất đống.

Luật cứng: bọc try/catch ôm trọn thân task, bắt mọi exception ngay bên trong và ghi nhận (log, counter) thay vì để nó thoát ra ngoài — như sweeper của TicketFlow đếm sweepErrors rồi cho lịch chạy tiếp.

Q5
execute và submit xử lý exception của task khác nhau ra sao, và vì sao submit-rồi-quên là nguồn lỗi ẩn?

execute để exception nổi lên UncaughtExceptionHandler của thread — bạn thấy stack trace trong log ngay. submit thì bắt exception, gói vào Future, và chỉ phát lại dưới dạng ExecutionException khi có ai gọi future.get().

Nếu bạn submit rồi không bao giờ get, exception nằm im trong Future không ai đọc — task hỏng mà không một dấu vết nào. Quy tắc: hoặc tiêu thụ Future tử tế, hoặc dùng execute cho task fire-and-forget để lỗi còn nổi lên.

Q6
Vì sao pattern shutdown chuẩn phải là shutdown() trước rồi mới shutdownNow(), và vì sao phải khôi phục cờ interrupt trong nhánh catch?

shutdown() cho task đang dở và task trong queue một cửa sổ ân huệ hoàn tất tử tế — không mất việc, không cắt ngang thao tác giữa chừng. Chỉ khi awaitTermination hết hạn mà pool chưa dừng, shutdownNow() mới interrupt các thread ngoan cố. Đảo thứ tự nghĩa là vứt task trong queue và cắt ngang task đang chạy ngay cả khi chúng chỉ cần thêm vài giây.

Trong nhánh catch (InterruptedException), gọi Thread.currentThread().interrupt() để khôi phục cờ vì cơ chế hủy của Java là hợp tác: nuốt im lặng exception này nghĩa là xóa sổ một yêu cầu dừng, khiến tầng code phía trên không bao giờ biết có ai đó đang muốn tắt hệ thống.

Q7
Vì sao pool cho task CPU-bound nên xấp xỉ số core, còn pool cho task I/O-bound cần nhiều thread hơn hẳn?

Task CPU-bound chiếm trọn một core suốt lúc chạy — thêm thread vượt số core không tăng việc làm được, chỉ tăng chi phí context switching. Task I/O-bound thì phần lớn thời gian ngồi chờ (DB, mạng) không dùng CPU; nếu chỉ có số thread bằng số core, các core rảnh rỗi trong lúc mọi thread treo trên lời gọi mạng.

Công thức Goetz (Java Concurrency in Practice §8.2): N = Ncpu × Ucpu × (1 + W/C) — tỉ số chờ/tính càng lớn, cần càng nhiều thread để bão hòa CPU. Và đừng trộn hai loại task vào một pool: một mẻ I/O chậm sẽ chiếm hết thread, bỏ đói các tính toán ngắn.

Bài tiếp theo: Future & CompletableFuture: Từ kết quả blocking đến pipeline async

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên