Future & CompletableFuture: Từ kết quả blocking đến pipeline async
Từ kết quả blocking đến pipeline async: Future, CompletionService, rồi CompletableFuture với thenApply/Compose/Combine, allOf/anyOf, xử lý lỗi và timeout.
TL;DR: submit một Callable trả về Future — tay cầm tới kết quả async, nhưng là tay cầm cứng: get chặn thread và không ghép nối được các bước phụ thuộc. CompletableFuture thay nó bằng pipeline khai báo: thenApply cho bước đồng bộ, thenCompose cho bước async nối tiếp (chính là flatMap của thế giới async), thenCombine hợp lưu hai nhánh song song, allOf/anyOf gom nhiều future, exceptionally/handle đón lỗi, orTimeout chặn treo vĩnh viễn. Ba luật sống còn khi dùng: luôn truyền executor riêng cho các method *Async (đừng mượn commonPool cho I/O), luôn đóng chuỗi bằng một chỗ tiêu thụ lỗi, và nhớ rằng ThreadLocal/MDC không tự theo task qua các stage.
1. Giới thiệu
Bài Executor & thread pool khép lại bằng một câu hỏi treo lơ lửng: khi submit một Callable, ta nhận về một Future — rồi sao nữa? Cái "tay cầm tới kết quả tương lai" ấy dùng thế nào cho đúng, và nó gánh được bao xa khi các bước công việc bắt đầu phụ thuộc nhau?
Hãy nhìn một màn hình rất đời thường của TicketFlow: trang xác nhận sau khi user bấm đặt vé. Để dựng nó, hệ thống phải gọi ba service — load hồ sơ user, tính giá cuối cùng cho ghế đã chọn, và gửi email xác nhận sau khi hai bước trên xong. Mỗi lời gọi mất chừng 200ms. Gọi tuần tự là 600ms; gọi khéo — hai bước đầu song song, bước ba nối sau — chỉ còn hơn 400ms, và thread phục vụ request không phải đứng chờ giây nào. Bài này đi đúng hành trình đó: bắt đầu từ Callable/Future và những gì chúng làm tốt, chạm vào hai giới hạn khiến chúng đuối sức (blocking và không compose được), rồi nâng lên CompletableFuture — nơi các bước async ghép thành một pipeline thật sự.
2. Callable và Future
2.1 Runnable không trả về gì, Callable thì có
Runnable.run() trả void và không được phép ném checked exception. Nó hợp với task kiểu "làm rồi thôi", nhưng vô dụng khi ta cần một giá trị. Từ Java 5, java.util.concurrent thêm Callable<V> để lấp đúng chỗ đó: call() trả về một V và được phép ném checked exception.
Callable<Integer> countSeats = () -> {
Thread.sleep(50); // giả lập truy vấn DB
return 128;
};
Khi ta submit một Callable cho executor, ta không nhận lại kết quả ngay — task còn chưa chắc đã chạy. Thứ ta nhận lại là một Future<V>: một lời hứa rằng kết quả kiểu V sẽ có ở một thời điểm trong tương lai.
try (var pool = Executors.newFixedThreadPool(2)) {
Future<Integer> seats = pool.submit(countSeats);
// ... làm việc khác trong lúc task chạy ...
int n = seats.get(); // chặn ở đây tới khi task xong
System.out.println("Còn " + n + " chỗ");
}
Future mô hình hóa vòng đời của một task async qua bốn method cốt lõi: get() lấy kết quả (chặn nếu chưa xong), get(timeout, unit) lấy kết quả nhưng chịu chờ tối đa một khoảng, cancel(mayInterruptIfRunning) yêu cầu hủy, và isDone()/isCancelled() để hỏi trạng thái. Một Future chỉ đi một chiều: từ "chưa xong" sang "xong" (thành công, lỗi, hoặc bị hủy), và không bao giờ quay lại.
2.2 get là một biên giới: nó dồn mọi thứ về thread gọi
Điểm tinh tế nhất của Future nằm ở chỗ get làm gì với lỗi. Task chạy trên một thread khác; nếu call() ném exception, exception đó không thể bay ngược về thread gọi theo cách thông thường. Thay vào đó, executor bắt lấy nó, gói vào một ExecutionException, và ném ra đúng lúc bạn gọi get. Nguyên nhân thật nằm trong getCause().
Future<Integer> f = pool.submit(() -> {
throw new IllegalStateException("kho vé tạm khóa");
});
try {
f.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // IllegalStateException gốc nằm ở đây
log.warn("Task hỏng: {}", cause.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // khôi phục cờ interrupt rồi mới xử lý
}
Hình dung get như cửa hải quan ở biên giới giữa hai thread. Mọi thứ task sản sinh — giá trị trả về hay exception — đều bị giữ lại bên kia biên giới cho tới khi có ai đó qua cửa get mang nó về. Đây vừa là điểm mạnh (lỗi không bị nuốt mất, nó chờ bạn tới nhận) vừa là điểm yếu (bạn buộc phải đứng ở cửa chờ).
Hai checked exception mà get ném ra nói lên hai chuyện khác hẳn nhau. ExecutionException nghĩa là task chạy và hỏng. InterruptedException nghĩa là chính thread đang chờ bị interrupt trong lúc chờ — task có thể vẫn đang chạy bình thường. Cách xử lý đúng cho InterruptedException gần như luôn là khôi phục lại cờ interrupt bằng Thread.currentThread().interrupt() rồi mới quyết định dừng, đúng như interruption policy đã bàn ở bài Executor & thread pool.
2.3 FutureTask: bản thân Future cũng là một Runnable
Trước khi rời Future, đáng biết một mảnh ghép giải thích vì sao mô hình này ráp được vào executor: FutureTask. Nó vừa là Runnable (nên executor chạy được) vừa là Future (nên bạn lấy kết quả được). Khi bạn submit(callable), bên trong executor gói callable vào một FutureTask, chạy nó như một Runnable, và trả FutureTask về cho bạn dưới mặt nạ Future.
FutureTask<Integer> task = new FutureTask<>(countSeats);
new Thread(task).start(); // chạy như Runnable
int n = task.get(); // lấy kết quả như Future
FutureTask cũng đáng nhớ vì nó dùng được làm "kết quả tính một lần, nhiều thread cùng chờ": gọi get nhiều lần từ nhiều thread đều an toàn, lần đầu chặn cho tới khi tính xong, các lần sau trả ngay kết quả đã cache.
3. CompletionService: gom kết quả theo thứ tự hoàn tất
Một tình huống rất thường gặp: bạn submit một loạt task và muốn xử lý kết quả ngay khi từng cái xong, chứ không phải đợi cả lô. Nếu bạn giữ một List<Future> rồi lặp qua mà get từng cái theo thứ tự submit, bạn vô tình ép mình chờ theo thứ tự submit: task đầu tiên chậm nhất sẽ chặn bạn không cho chạm tới những task đã xong từ lâu phía sau.
Hãy tưởng tượng bạn gửi quần áo tới năm tiệm giặt khác nhau. Bạn muốn lấy đồ ngay khi tiệm nào xong, chứ không muốn đứng lì trước tiệm số 1 chờ trong khi đồ ở tiệm số 4 đã giặt xong và đang nguội dần. CompletionService chính là cái quầy "ai xong trước trả trước" đó.
ExecutorService pool = Executors.newFixedThreadPool(4);
var ecs = new ExecutorCompletionService<Integer>(pool);
for (String region : regions) {
ecs.submit(() -> querySeatCount(region)); // 5 truy vấn song song
}
int total = 0;
for (int i = 0; i < regions.size(); i++) {
Future<Integer> done = ecs.take(); // lấy task ĐÃ xong sớm nhất
total += done.get(); // get() ở đây không còn chặn lâu
}
ExecutorCompletionService bọc quanh một executor và đẩy mỗi task vừa hoàn tất vào một hàng đợi nội bộ. take() chặn cho tới khi có một kết quả bất kỳ sẵn sàng, rồi trả về đúng Future đó. Khác biệt cốt lõi so với vòng lặp get ngây thơ: bạn tiêu thụ kết quả theo thứ tự hoàn tất thay vì thứ tự submit, nên latency tổng gần với task chậm nhất chứ không phải tổng của các lần chờ xếp chồng. Khi cần "lấy cái xong đầu tiên rồi bỏ phần còn lại", poll() không chặn cộng với cancel trên các future còn lại cho bạn đúng hành vi đó.
4. Giới hạn của Future: nó chặn, và nó không ghép nối
Future giải quyết gọn bài toán "chạy một task rồi lấy kết quả". Nhưng đặt nó vào một hệ thống thật, hai giới hạn lộ ra ngay.
Thứ nhất, get là blocking. Mỗi lần bạn gọi get, một thread — thường là một thread đắt đỏ — bị ghim lại chỉ để ngồi chờ. Trong một service xử lý nghìn request, nếu mỗi request lại có một thread đứng chờ get, bạn đang đốt đúng thứ tài nguyên mà thread pool sinh ra để tiết kiệm. Thread không làm gì hữu ích, nó chỉ chờ.
Thứ hai, và sâu hơn, Future không compose được. Hãy thử diễn đạt một yêu cầu rất đời thường: "lấy thông tin user, rồi dựa trên đó gọi sang service tính giá, rồi dựa trên đó gửi xác nhận." Với Future thuần, bạn buộc phải get kết quả bước một (chặn), mới có dữ liệu để submit bước hai, rồi lại get (chặn), rồi mới submit bước ba. Cái chuỗi phụ thuộc lẽ ra phải chảy mượt như một dây chuyền lại bị cắt vụn thành những lần chặn nối tiếp.
// Future thuần: mỗi bước phụ thuộc bước trước đều phải get() — chặn rồi mới đi tiếp
Future<User> fu = pool.submit(() -> loadUser(id));
User user = fu.get(); // chặn (1)
Future<Price> fp = pool.submit(() -> priceFor(user));
Price price = fp.get(); // chặn (2)
Future<Receipt> fr = pool.submit(() -> sendReceipt(user, price));
Receipt receipt = fr.get(); // chặn (3)
Future không có chỗ nào để bạn gắn một callback kiểu "khi xong thì làm tiếp việc này". Nó chỉ cho bạn hỏi "xong chưa?" và đứng chờ "cho tôi kết quả". Không có cách nào nói "đừng chặn tôi, cứ chạy tiếp, và khi nào có kết quả thì tự động nối sang bước sau". Đúng cái khoảng trống này là lý do CompletableFuture ra đời.
5. CompletableFuture: pipeline async compose được
CompletableFuture<T> (Java 8) vừa là một Future vừa là một CompletionStage. Vế Future cho ta get/cancel quen thuộc. Vế CompletionStage mới là phần đổi cuộc chơi: nó cho phép ta khai báo các bước sẽ chạy khi kết quả sẵn sàng, mà không thread nào phải ngồi chờ ở giữa.
5.1 Khởi tạo: supplyAsync và runAsync
Cách thông dụng nhất để mở đầu một pipeline là supplyAsync, nhận một Supplier (có trả về) hoặc runAsync nhận một Runnable (không trả về).
CompletableFuture<User> cf =
CompletableFuture.supplyAsync(() -> loadUser(id), pool);
Nếu không truyền executor, các method *Async chạy trên ForkJoinPool.commonPool() — một chi tiết quan trọng đến mức ta dành hẳn phần 8 để mổ xẻ vì sao điều đó dễ thành bẫy. Quy tắc thực dụng: trong code production, gần như luôn truyền executor của riêng bạn.
5.2 Ba phép biến đổi: thenApply, thenCompose, thenCombine
Ba phép ghép nối này phủ gần hết nhu cầu hằng ngày, và phân biệt được chúng là chìa khóa dùng CompletableFuture cho đúng.
thenApply biến đổi kết quả bằng một hàm thường: T -> U. Dùng khi bước tiếp theo là một phép tính đồng bộ, nhanh.
CompletableFuture<String> name =
CompletableFuture.supplyAsync(() -> loadUser(id), pool)
.thenApply(User::displayName); // User -> String
thenCompose dùng khi bước tiếp theo bản thân nó trả về một CompletableFuture: T -> CompletableFuture<U>. Đây là phép ghép hai stage async nối đuôi nhau. Nếu bạn lỡ dùng thenApply cho trường hợp này, bạn sẽ nhận một CompletableFuture<CompletableFuture<U>> lồng hai lớp — đúng kiểu bực mình mà flatMap sinh ra ở Optional hay Stream. thenCompose chính là flatMap của thế giới async: nó làm phẳng cái lồng đó.
// loadUser và priceForAsync đều là thao tác async trả CompletableFuture
CompletableFuture<Price> price =
CompletableFuture.supplyAsync(() -> loadUser(id), pool)
.thenCompose(user -> priceForAsync(user, pool)); // KHÔNG lồng hai lớp
thenCombine dùng khi bạn có hai pipeline độc lập và muốn gộp kết quả của chúng lại khi cả hai cùng xong: (T, U) -> V. Hai nhánh chạy song song, không nhánh nào phụ thuộc nhánh kia.
CompletableFuture<Seat> seatCf = CompletableFuture.supplyAsync(() -> reserveSeat(id), pool);
CompletableFuture<Invoice> invCf = CompletableFuture.supplyAsync(() -> draftInvoice(id), pool);
CompletableFuture<Confirmation> conf =
seatCf.thenCombine(invCf, (seat, invoice) -> new Confirmation(seat, invoice));
So sánh nhanh cho dễ nhớ: thenApply nối một bước đồng bộ, thenCompose nối một bước async khác (tránh lồng), thenCombine hợp lưu hai nhánh song song.
5.3 Stage chạy trên thread nào: ba biến thể và *Async
Mỗi phép ghép nối có ba dạng, và sự khác nhau giữa chúng quyết định task của bạn chạy trên thread nào.
thenApply(fn) — không hậu tố — chạy fn trên thread vừa hoàn tất stage trước, hoặc trên thread gọi nếu stage trước đã xong sẵn. Bạn không kiểm soát được đó là thread nào, và nếu fn nặng hoặc blocking, bạn vô tình ghim thread của stage trước.
thenApplyAsync(fn) đẩy fn sang commonPool.
thenApplyAsync(fn, executor) đẩy fn sang đúng executor bạn chỉ định.
cf.thenApply(fn) // chạy trên thread của stage trước — rẻ nhưng khó đoán
.thenApplyAsync(fn) // chạy trên commonPool
.thenApplyAsync(fn, myPool); // chạy trên myPool — kiểm soát rõ ràng
Quy tắc đáng giữ: nếu hàm của bạn là tính toán nhỏ, thuần CPU và nhanh, biến thể không hậu tố là tối ưu vì tránh được chi phí chuyển thread. Còn nếu nó blocking, nặng, hoặc đụng tới một tài nguyên cần cô lập (như một pool dành riêng cho gọi mạng), hãy dùng biến thể *Async với executor của riêng bạn để chủ động chọn nơi nó chạy. Trong capstone, ta sẽ thấy đúng kỹ thuật này: bước book chạy trên bookingPool, bước notify chạy trên notifyPool.
Ghép ba phép biến đổi và quy tắc chọn thread lại, một pipeline điển hình cho trang xác nhận của TicketFlow trông như sau:
flowchart LR
A["supplyAsync(fetchUser, ioPool)"] --> B["thenCompose(u -> fetchOrders(u))"]
P["supplyAsync(fetchPrices, ioPool)"] --> C["thenCombine(pricesCf, gop 2 nhanh)"]
B --> C
C --> D["exceptionally(fallback)"]Đọc sơ đồ theo con mắt "thread nào chạy stage nào": hai stage supplyAsync chạy trên ioPool mà ta truyền vào — nếu quên tham số executor, chúng rơi xuống ForkJoinPool.commonPool(), đúng cái bẫy phần 8 sẽ mổ. thenCompose và thenCombine không có hậu tố Async nên chạy trên thread vừa hoàn tất stage trước đó (một worker của ioPool, hoặc thread gọi nếu kết quả đã sẵn) — chấp nhận được vì chúng chỉ ghép nối, không blocking. Còn nếu hàm gộp ở thenCombine nặng, đổi sang thenCombineAsync(..., cpuPool) để đẩy nó sang pool tính toán riêng. exceptionally đứng cuối làm lưới an toàn: lỗi từ bất kỳ stage nào phía trên đều trượt qua các stage biến đổi còn lại và rơi vào đây.
6. Kết hợp nhiều future và xử lý lỗi
6.1 allOf và anyOf
Khi có một mảng future và muốn chờ tất cả cùng xong, allOf cho bạn một CompletableFuture<Void> hoàn tất khi mọi future trong mảng đều xong.
CompletableFuture<?>[] all = requests.stream()
.map(r -> gateway.submit(r))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(all)
.thenRun(() -> log.info("Đã xử lý {} request", all.length));
allOf trả Void, nên muốn lấy từng kết quả bạn vẫn phải join từng future sau khi allOf xong — nhưng lúc đó join không còn chặn vì mọi thứ đã hoàn tất. anyOf ngược lại: hoàn tất ngay khi bất kỳ future nào xong, hợp với pattern "hỏi nhiều nguồn, lấy câu trả lời về sớm nhất".
6.2 exceptionally, handle, whenComplete
Lỗi trong một pipeline async lan theo chuỗi: nếu một stage ném exception, các stage biến đổi phía sau bị bỏ qua, và lỗi trôi xuống cho tới khi gặp một stage biết xử lý nó. Có ba cách bắt.
exceptionally(fn) chỉ kích hoạt khi có lỗi, và cho bạn cơ hội trả về một giá trị thay thế: Throwable -> T. Nó là lưới an toàn cuối chuỗi.
CompletableFuture.supplyAsync(() -> loadUser(id), pool)
.thenApply(User::displayName)
.exceptionally(ex -> "khách ẩn danh"); // lỗi ở bất kỳ đâu phía trên đều rơi vào đây
handle(fn) luôn chạy, dù thành công hay lỗi: (T, Throwable) -> U. Nó vừa xử lý được lỗi vừa biến đổi được kết quả, nên hợp khi bạn muốn quy cả hai nhánh về một kiểu chung.
.handle((value, ex) -> ex != null
? BookingResult.fail(ex.getMessage())
: BookingResult.ok(value));
whenComplete(fn) cũng luôn chạy nhưng không đổi được kết quả: (T, Throwable) -> void. Nó dành cho side-effect như logging hay dọn tài nguyên, để nguyên kết quả (hoặc lỗi) chảy tiếp.
Một chi tiết hay vấp: exception bên trong một stage không bị ném thẳng ra mà bị gói trong CompletionException. Khi bạn xử lý trong exceptionally/handle, nguyên nhân thật thường nằm ở ex.getCause() — đúng như ExecutionException của Future. Code production luôn cần bóc lớp đó ra trước khi phân loại lỗi.
6.3 orTimeout và completeOnTimeout
Một pipeline không có giới hạn thời gian là một pipeline có thể treo vĩnh viễn. Từ Java 9, CompletableFuture có hai van an toàn dựng sẵn. orTimeout(t, unit) làm future hỏng với TimeoutException nếu nó chưa xong sau khoảng t. completeOnTimeout(value, t, unit) thì hoàn tất future với một giá trị mặc định thay vì ném lỗi.
gateway.submit(req)
.orTimeout(2, TimeUnit.SECONDS) // quá 2s thì hỏng với TimeoutException
.exceptionally(ex -> BookingResult.fail("hệ thống bận, thử lại sau"));
Cần phân biệt cái van này với việc thực sự hủy công việc. orTimeout chỉ làm cái future hoàn tất theo nhánh lỗi; nó không tự động dừng task đang chạy ở phía dưới. Nếu task đang gọi mạng, lời gọi đó vẫn tiếp diễn cho tới khi nó tự xong — đúng giới hạn của cancellation đã bàn ở bài Executor & thread pool. Timeout ở đây là về "tôi thôi không chờ nữa", không phải "công việc dừng lại".
7. Capstone: luồng xác nhận → thông báo viết bằng CompletableFuture
TicketFlow v3 ráp đúng những mảnh trên thành luồng đặt vé thật. BookingGateway nhận một request và trả về một CompletableFuture<BookingResult> — caller không bao giờ phải chặn để chờ. Bên trong là một chuỗi compose: validate → book → notify, mỗi bước chạy trên pool phù hợp, có van timeout cuối chuỗi và một chỗ xử lý lỗi gom chung.
public CompletableFuture<BookingResult> submit(BookingRequest req) {
Objects.requireNonNull(req);
return CompletableFuture
.supplyAsync(() -> validate(req), bookingPool) // mở chuỗi trên bookingPool
.thenApplyAsync(this::performBooking, bookingPool) // book: T -> BookingResult
.thenComposeAsync(this::notifyAsync, notifyPool) // notify async: nối stage khác
.orTimeout(overallTimeout.toMillis(), MILLISECONDS) // van an toàn cả chuỗi
.exceptionally(this::handleException); // gom mọi nhánh lỗi về một chỗ
}
Mỗi lựa chọn ở đây nhắc lại một ý của bài. thenComposeAsync được chọn thay cho thenApplyAsync ở bước notify vì bản thân việc gửi thông báo là một thao tác async trả về CompletableFuture — nếu dùng thenApply ta sẽ ôm một future lồng future. Việc tách bookingPool và notifyPool là áp dụng đúng quy tắc phần 5.3: bước notify gọi mạng (gửi email/HTTP), nên nó được cô lập sang pool riêng để không ăn mòn pool xử lý logic đặt vé. Và handleException bóc CompletionException ra để phân loại nguyên nhân thật:
private BookingResult handleException(Throwable ex) {
Throwable cause = (ex instanceof CompletionException) ? ex.getCause() : ex;
if (cause instanceof SoldOutException so) return BookingResult.fail("Sold out: " + so.getMessage());
if (cause instanceof IllegalArgumentException ia) return BookingResult.fail(ia.getMessage());
if (cause instanceof TimeoutException) return BookingResult.fail("Booking timed out");
return BookingResult.fail("Internal error: " + cause.getMessage());
}
Bước notifyAsync còn cho thấy một quyết định nghiệp vụ gọn gàng: nếu kết quả đã là thất bại (ví dụ sold out), không gửi thông báo, mà trả thẳng future đã hoàn tất.
private CompletableFuture<BookingResult> notifyAsync(BookingResult r) {
if (!r.success()) {
return CompletableFuture.completedFuture(r); // không notify cho ca thất bại
}
return CompletableFuture
.runAsync(() -> notifier.send(r), notifyPool)
.thenApply(v -> r); // notify xong thì giữ nguyên kết quả book
}
Khi cần xử lý cả lô — chẳng hạn 500 request đổ vào cùng một sự kiện — allOf gom chúng lại để chờ tất cả mà không chặn từng cái một, và vì BookingService bên dưới đã thread-safe từ các bài trước, chuỗi này không bao giờ bán vượt capacity. Đáng chú ý là toàn bộ pipeline được viết mà không có một lời gọi get chặn nào trong đường đi chính: thread chỉ làm việc khi có việc, còn lúc chờ I/O thì nhường pool cho request khác.
8. Cạm bẫy
Ba cái bẫy dưới đây gây ra phần lớn sự cố CompletableFuture trong code production, và cả ba đều âm thầm — không bẫy nào báo lỗi lúc biên dịch.
Cái bẫy nguy hiểm nhất là nuốt exception. Một pipeline mà nhánh cuối không có exceptionally/handle/whenComplete, lại không ai gọi get/join lên nó, sẽ nuốt trọn exception trong im lặng — task hỏng mà không một dòng log nào hiện ra. Đây là phiên bản async của việc bỏ trống catch. Quy tắc: mọi pipeline phải kết thúc ở một chỗ tiêu thụ lỗi, hoặc một stage xử lý lỗi, hoặc một get/join có bắt exception đàng hoàng.
Cái bẫy thứ hai là dùng commonPool cho task blocking. Các method *Async không truyền executor sẽ chạy trên ForkJoinPool.commonPool(), mà commonPool được sizing theo số CPU core (mặc định khoảng số core - 1) cho task ngắn, thuần CPU. Nếu bạn đổ task blocking I/O vào đó — gọi DB, gọi HTTP — chỉ vài task chờ là cả commonPool nghẹt, và mọi thứ khác trong JVM cũng dùng commonPool (kể cả parallel stream) bị vạ lây. Đây chính là lý do capstone luôn truyền bookingPool/notifyPool riêng.
Cái bẫy thứ ba tinh vi hơn: mất thread context. Vì một stage có thể chạy trên bất kỳ thread nào trong pool — và biến thể không *Async thậm chí chạy trên thread của stage trước — mọi thứ bạn gắn vào thread sẽ không tự nó theo task sang stage sau: ThreadLocal, security context, MDC của logging — MDC (Mapped Diagnostic Context) là một map key-value gắn theo thread mà logging framework tự chèn vào mỗi dòng log, thường chứa trace id hay user id — và cả transaction context. Một giá trị bạn đặt vào ThreadLocal trước khi vào pipeline gần như chắc chắn không có mặt khi một stage *Async chạy trên thread khác. Trong code đời thật, đây là nguồn của những lỗi kiểu "log mất trace id" hay "mất user context giữa chừng". Ở phần C của series, ScopedValue (final ở Java 25) được thiết kế chính để giải bài toán truyền context qua các ranh giới như thế này một cách đúng đắn — xem bài Structured Concurrency & Scoped Values.
9. 📚 Deep Dive Oracle
Spec / reference chính thức:
CompletableFuturejavadoc (Java 21) — phần đầu mô tả chính sách chọn thread cho biến thể có/không hậu tốAsyncvà quy tắc gói lỗi vàoCompletionException.CompletionStagejavadoc — contract trừu tượng của mọi phép ghép nối stage; đọc để hiểuthenCompose/thenCombineđộc lập với cài đặt.Futurejavadoc — ngữ nghĩa chính xác củacancel(mayInterruptIfRunning)và hai checked exception củaget.
Ghi chú: đoạn "CompletableFuture ... uses threads of ForkJoinPool.commonPool()" trong javadoc là nơi quy định hành vi mặc định gây ra cạm bẫy mục 8 — đáng đọc nguyên văn một lần để biết chính xác khi nào commonPool được dùng.
10. Tổng kết
Future cho ta một tay cầm tới kết quả async, nhưng là một tay cầm cứng: get chặn thread, và không có cách nào ghép nối nhiều bước phụ thuộc mà không cắt chúng thành chuỗi blocking. CompletableFuture thay cái tay cầm đó bằng một pipeline khai báo được — thenApply cho bước đồng bộ, thenCompose cho bước async nối tiếp, thenCombine cho hai nhánh hợp lưu, allOf/anyOf để gom nhiều nhánh, và exceptionally/handle/orTimeout để xử lý lỗi cùng thời gian chờ. Điều kiện để dùng nó cho đúng nằm gọn trong ba ý: luôn chọn executor cho *Async, luôn đóng chuỗi bằng một chỗ tiêu thụ lỗi, và luôn nhớ rằng thread context không tự theo task qua các stage.
Cả Future lẫn CompletableFuture đều giả định một điều: task của bạn là những đơn vị công việc tương đối độc lập mà executor đem chạy. Nhưng có một lớp bài toán khác — divide-and-conquer trên dữ liệu lớn, thuần CPU — nơi một task phải tự chẻ mình thành các task con đệ quy rồi gộp kết quả lại. Lớp bài toán đó cần một framework với cách lập lịch khác hẳn, nơi các thread nhàn rỗi chủ động "ăn cắp" việc của thread bận để cân tải giữa các core. Đó là Fork/Join, chủ đề của bài kế tiếp.
11. Tự kiểm tra
Q1thenApply và thenCompose khác nhau thế nào? Chuyện gì xảy ra nếu dùng nhầm thenApply cho một hàm trả về CompletableFuture?▸
thenApply nhận một hàm thường T -> U — dùng cho bước biến đổi đồng bộ. thenCompose nhận hàm T -> CompletableFuture<U> — dùng khi bước tiếp theo bản thân nó là một thao tác async, và nó làm phẳng kết quả, giống flatMap của Optional/Stream.
Dùng nhầm thenApply cho hàm trả future, bạn nhận về CompletableFuture<CompletableFuture<U>> lồng hai lớp — code vẫn compile, nhưng để lấy giá trị thật bạn phải bóc hai lần, và rất dễ quên chờ future bên trong hoàn tất (mất luôn cả lỗi của nó).
Q2thenApply(fn) và thenApplyAsync(fn) chạy fn trên thread nào? Vì sao biến thể không hậu tố vừa rẻ vừa nguy hiểm?▸
thenApply(fn) chạy fn trên thread vừa hoàn tất stage trước, hoặc ngay trên thread gọi nếu stage trước đã xong sẵn — bạn không kiểm soát được đó là thread nào. thenApplyAsync(fn) đẩy fn sang commonPool; thenApplyAsync(fn, executor) đẩy sang đúng executor bạn chỉ định.
Biến thể không hậu tố rẻ vì tránh chi phí chuyển thread — tối ưu cho phép tính nhỏ thuần CPU. Nhưng nếu fn nặng hoặc blocking, bạn vô tình ghim chính worker thread của stage trước (có thể thuộc một pool quan trọng), làm pool đó mất một thread cho tới khi fn xong.
Q3Vì sao không nên để task blocking I/O chạy trên ForkJoinPool.commonPool()?▸
commonPool được sizing theo số CPU core (mặc định khoảng số core trừ một) với giả định task ngắn, thuần CPU. Một task chờ DB hay HTTP chiếm một worker mà không dùng CPU — chỉ vài task như vậy là cả pool nghẹt.
Tệ hơn, commonPool là tài nguyên dùng chung toàn JVM: mọi parallel stream và mọi *Async không truyền executor đều đổ vào đó. Một góc ứng dụng nhét I/O vào commonPool sẽ làm chậm một góc hoàn toàn khác chẳng liên quan. Vì thế quy tắc production: luôn truyền executor riêng cho supplyAsync/thenApplyAsync khi task có thể chặn — như capstone tách bookingPool và notifyPool.
Q4orTimeout(2, SECONDS) có hủy task gốc đang chạy không? Phân biệt nó với cancellation thật.▸
Không. orTimeout chỉ làm cái future hoàn tất theo nhánh lỗi với TimeoutException sau 2 giây — nó là "tôi thôi không chờ nữa", không phải "công việc dừng lại". Task phía dưới (ví dụ một lời gọi mạng) vẫn tiếp tục chạy, vẫn chiếm thread và tài nguyên cho tới khi tự xong.
Muốn dừng task thật sự phải đi qua cơ chế hủy hợp tác — interrupt thread và task phải tự kiểm tra cờ — đúng giới hạn đã bàn ở bài Executor. Vì vậy timeout luôn cần đi kèm: task bên dưới nên có timeout riêng của nó (HTTP client timeout, query timeout) để không rò rỉ công việc mồ côi.
Q5Một pipeline không có exceptionally/handle ở cuối và không ai gọi get/join — exception trong stage sẽ đi đâu?▸
Không đi đâu cả — nó bị nuốt trong im lặng. Exception được gói vào CompletionException và lưu trong future như một "kết quả lỗi", chờ ai đó tới nhận. Các stage biến đổi phía sau bị bỏ qua, nhưng không có log, không có stack trace nào tự nổi lên.
Đây là phiên bản async của catch rỗng, và là cái bẫy nguy hiểm nhất của CompletableFuture. Quy tắc: mọi pipeline phải kết thúc ở một chỗ tiêu thụ lỗi — exceptionally/handle/whenComplete, hoặc một get/join có bắt exception tử tế. Và khi bắt, nhớ bóc getCause() để lấy nguyên nhân thật.
Q6CompletionService giải quyết vấn đề gì so với việc giữ một List các Future rồi get lần lượt?▸
Vòng lặp get theo thứ tự submit ép bạn chờ theo đúng thứ tự đó: nếu task đầu tiên chậm nhất, bạn đứng chặn ở nó trong khi các task phía sau đã xong từ lâu mà không được xử lý.
ExecutorCompletionService đẩy mỗi task vừa hoàn tất vào một hàng đợi nội bộ; take() trả về future đã xong sớm nhất. Bạn tiêu thụ kết quả theo thứ tự hoàn tất thay vì thứ tự submit, nên tổng thời gian chờ tiến gần tới thời gian của task chậm nhất thay vì cộng dồn các lần chờ xếp chồng.
Q7Vì sao giá trị đặt trong ThreadLocal (hay MDC) trước khi vào pipeline thường biến mất ở các stage sau?▸
ThreadLocal gắn dữ liệu vào thread, không gắn vào task. Trong một pipeline, mỗi stage có thể chạy trên bất kỳ worker nào của pool — và biến thể không hậu tố Async chạy trên thread của stage trước, cũng không phải thread của bạn. Task di chuyển giữa các thread, còn dữ liệu thì ở lại với thread cũ.
Hệ quả thực tế: log mất trace id, mất security context giữa chừng. Cách chữa tạm là tự capture context trước khi submit rồi set lại trong task; cách chữa đúng là ScopedValue — cơ chế truyền context theo phạm vi công việc thay vì theo thread, sẽ gặp ở bài Structured Concurrency.
Bài tiếp theo: Fork/Join: Chia để trị song song với work-stealing
Bài này có giúp bạn hiểu bản chất không?
Hỏi đáp về bài này
Chưa có câu hỏi
Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).
Đặt câu hỏi đầu tiên