Java — Từ Zero đến Senior/ExecutorService và CompletableFuture — thread pool và async
~25 phútConcurrency cơ bảnMiễn phí

ExecutorService và CompletableFuture — thread pool và async

Vì sao không dùng new Thread() trong production. Thread pool với ExecutorService, 4 factory, Future.get() blocking. CompletableFuture chain async không callback hell, thenApply vs thenCompose, exceptionally.

Bài 10.1 kết lại: trong production, ai cũng dùng ExecutorService thay vì tạo new Thread(). Bài này giải thích cụ thể vì sao — và cung cấp toolkit đầy đủ cho async programming hiện đại: thread pool, Future, CompletableFuture.

Quay lại ví dụ crawler 100 URL:

// Cach "new Thread" - TE
for (String url : urls) {
    new Thread(() -> httpGet(url)).start();
}

3 vấn đề nghiêm trọng:

  1. Tạo thread tốn (~100μs syscall + 1MB stack). 100 thread × 1MB = 100MB RAM cho chỉ 100 request. 10k request = 10GB — OOM.
  2. Không limit: burst traffic 50k request → 50k thread → crash instance. Không có back-pressure.
  3. Không nhận kết quả: Thread không return value. Phải dùng shared state + join + synchronize — phức tạp, dễ bug.

ExecutorService giải quyết cả 3:

  1. Pool tái dùng thread — tạo 1 lần, xài nhiều task.
  2. Bounded pool + queue — task vượt limit xếp hàng hoặc reject, không crash instance.
  3. Future patternsubmit(task) trả Future<T>, caller lấy kết quả khi cần.

Nâng cấp lên CompletableFuture cho phép chain async operation — task xong gọi callback kế, không viết nested callback. Đây là cơ sở của microservice aggregation hiện đại: gọi 5 service song song, combine kết quả, handle lỗi, timeout.

Bài này đi qua: ExecutorService (4 factory + ThreadPoolExecutor tự config), Future (và giới hạn của nó), CompletableFuture (API + pattern chain), và shutdown đúng cách.

1. Analogy — Phòng call center

Hai cách tổ chức call center:

Cách A — thuê khi có cuộc gọi: mỗi cuộc gọi tới, tuyển 1 nhân viên mới, họ làm xong thì sa thải. Chi phí HR + training mỗi người = $100. 1000 cuộc gọi = $100k.

Cách B — pool 10 nhân viên fix: tuyển 10 người, họ xử lý mọi cuộc gọi. Khi nhân viên rảnh, nhận cuộc gọi mới. Nếu 10 người bận hết, cuộc gọi xếp hàng. Chi phí = $100 × 10 = $1000 cho bất kỳ số cuộc gọi nào.

Cách B là thread pool. Cách A là new Thread() mỗi task — không ai làm thế.

Bonus cách B: khi khách hàng đưa phiếu yêu cầu phức tạp ("xử lý xong gọi lại tôi"), nhân viên đưa lại phiếu chờ (Future). Khách cầm phiếu đi, khi cần thì quay lại đổi phiếu lấy kết quả. Nếu khách muốn chain "kết quả xong tự động gửi SMS cho tôi" — đó là CompletableFuture.thenAccept.

Đời thườngJava
Pool 10 nhân viênExecutors.newFixedThreadPool(10)
Nhận cuộc gọi mớiexecutor.submit(task)
Phiếu chờ kết quảFuture<T>
Đổi phiếu lấy kết quả (block)future.get()
Callback "khi xong làm X"future.thenApply(x -> ...) (CompletableFuture)
Sa thải khi đóng call centerexecutor.shutdown()

2. ExecutorService — thread pool

Code cơ bản

import java.util.concurrent.*;

ExecutorService exec = Executors.newFixedThreadPool(4);

exec.submit(() -> System.out.println("task 1 on " + Thread.currentThread().getName()));
exec.submit(() -> System.out.println("task 2 on " + Thread.currentThread().getName()));
exec.submit(() -> System.out.println("task 3 on " + Thread.currentThread().getName()));

exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);

Output:

task 1 on pool-1-thread-1
task 2 on pool-1-thread-2
task 3 on pool-1-thread-3

Pool có 4 thread, 3 task gọi 3 thread khác nhau. Task thứ 4 dùng lại thread-1 khi nó rảnh.

submit(Runnable) thêm task vào queue của pool. Khi có thread rảnh, pool lấy task từ queue và run. Nếu pool đủ load, task chờ.

4 factory thường dùng

JDK cung cấp Executors với factory method cho use case phổ biến:

FactoryĐặc điểmUse case
newFixedThreadPool(n)n thread cố định, queue không giới hạnBiết parallelism cần, vd #CPU core cho CPU-bound
newCachedThreadPool()Thread pool grow theo demand, idle 60s → recycleTask ngắn, burst không đều
newSingleThreadExecutor()1 thread duy nhấtSerialize task (order-preserving)
newScheduledThreadPool(n)Support schedule delayed/periodicCron-like job

newFixedThreadPool(n) chi tiết

ExecutorService exec = Executors.newFixedThreadPool(8);

Bên dưới là new ThreadPoolExecutor(8, 8, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()).

Đặc điểm:

  • Pool size cố định 8 thread, không grow không shrink.
  • Queue là LinkedBlockingQueue không giới hạn — task vượt pool chờ trong queue vô hạn.

Warning: "unbounded queue" có thể OOM nếu producer nhanh hơn consumer.

newCachedThreadPool() chi tiết

ExecutorService exec = Executors.newCachedThreadPool();

Bên dưới: new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>()).

  • Pool size từ 0 đến Integer.MAX_VALUE.
  • Queue là SynchronousQueue — capacity 0, task phải có thread nhận ngay hoặc bị reject.
  • Thread idle 60 giây → remove khỏi pool.

Đặc điểm "danger": không upper limit. 10k request đồng thời → 10k thread. OOM risk cao.

⚠️ newCachedThreadPool với traffic burst

Nhanh, tiện, nhưng không bảo vệ hệ thống. Production nên dùng ThreadPoolExecutor tự cấu hình với bounded queue + rejection policy. newCachedThreadPool phù hợp demo, script ngắn — không cho service 24/7.

Cấu hình thủ công với ThreadPoolExecutor

Để control chi tiết, tự tạo:

ExecutorService exec = new ThreadPoolExecutor(
    4,                           // corePoolSize
    10,                          // maxPoolSize
    60, TimeUnit.SECONDS,        // keepAliveTime cho extra thread
    new ArrayBlockingQueue<>(100),    // bounded queue capacity 100
    new ThreadPoolExecutor.CallerRunsPolicy()   // rejection policy
);

Cơ chế:

  • Submit task: nếu pool có tối đa 4 thread, tạo thread mới + run ngay.
  • Nếu đã 4 thread busy, vào queue (max 100 task).
  • Nếu queue đầy (100 task), tạo thread extra đến 10.
  • Nếu đã 10 thread và queue đầy, áp dụng rejection policy.

Rejection policy options:

  • AbortPolicy (default): throw RejectedExecutionException.
  • CallerRunsPolicy: task chạy ngay trên thread gọi submit → back-pressure, producer chậm lại.
  • DiscardPolicy: bỏ qua, không error.
  • DiscardOldestPolicy: bỏ task cũ nhất trong queue, thêm task mới.

Chọn policy theo semantic: web server dùng CallerRunsPolicy hoặc AbortPolicy (trả 503); background job dùng CallerRunsPolicy; cache update có thể DiscardPolicy.

Đây là config cho production — newFixedThreadPool chỉ phù hợp prototype.

3. Future<T> — phiếu chờ kết quả

API cơ bản

Future<Integer> future = exec.submit(() -> {
    Thread.sleep(1000);
    return 42;
});

// Main thread lam viec khac
doOtherStuff();

// Lay ket qua - BLOCK neu chua xong
Integer result = future.get();
System.out.println(result);   // 42

submit(Callable<T>) trả Future<T>. future.get() block cho đến khi task complete, trả kết quả.

Method quan trọng:

  • get(): block indefinitely đến khi task xong.
  • get(timeout, unit): block tối đa timeout; hết → TimeoutException.
  • cancel(mayInterruptIfRunning): request cancel. Nếu true, gọi thread.interrupt().
  • isDone(), isCancelled(): trạng thái.

Lấy kết quả nhiều task

List<Future<Integer>> futures = List.of(
    exec.submit(() -> fetchFromApi(1)),
    exec.submit(() -> fetchFromApi(2)),
    exec.submit(() -> fetchFromApi(3))
);

List<Integer> results = new ArrayList<>();
for (Future<Integer> f : futures) {
    results.add(f.get());   // Cho tung future
}

Cách này sequential wait: submit đồng thời xong, nhưng lấy kết quả từng cái một. Nếu task 1 mất 5s, task 2 mất 1s, task 3 mất 2s → walltime = 5s (lock ở task 1). Tốt hơn chỗ lấy kết quả trong cùng loop thứ tự.

Nhược điểm Future

Future là API Java 5 (2004) — đủ dùng nhưng có giới hạn:

  1. get() block: muốn "kết quả xong thì làm X" phải block thread chờ, hoặc polling isDone() (lãng phí CPU).
  2. Không compose: muốn "gọi API1, xong thì gọi API2 với kết quả API1" — phải get() rồi submit task mới. Nested, verbose.
  3. Exception awkward: get() wrap exception vào ExecutionException — unwrap cồng kềnh.
  4. Không có combinator: "chờ tất cả future xong", "chờ future đầu tiên xong" — không có API built-in.

Java 8 thêm CompletableFuture fix tất cả giới hạn này.

4. CompletableFuture — async composable

Tạo và transform

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
    return fetchFromApi(1);
}, exec);   // Pass executor

cf.thenApply(x -> x * 2)            // Transform ket qua
  .thenAccept(System.out::println);   // In, khong return

supplyAsync chạy Supplier<T> trên executor. Nếu không pass executor, dùng ForkJoinPool.commonPool() — nhớ bài 9.4: commonPool shared toàn JVM, không dùng cho I/O.

thenApply(Function<T, R>) — áp function lên kết quả, trả CompletableFuture<R>. Tương đương map trong Stream.

thenAccept(Consumer<T>) — consume kết quả, không return. Trả CompletableFuture<Void> — đánh dấu chain kết thúc.

thenApply vs thenCompose

Phân biệt quan trọng, tương tự map vs flatMap trong Stream:

// thenApply: function T -> R
CompletableFuture<String> name = userFuture.thenApply(User::getName);
// CompletableFuture<String>

// thenCompose: function T -> CompletableFuture<R>
CompletableFuture<List<Order>> orders = userFuture.thenCompose(user ->
    fetchOrdersAsync(user.getId())   // Tra CompletableFuture<List<Order>>
);
// CompletableFuture<List<Order>> - KHONG bi nested

Nếu dùng thenApply cho case thứ hai:

CompletableFuture<CompletableFuture<List<Order>>> bad = userFuture.thenApply(user ->
    fetchOrdersAsync(user.getId())
);
// Nested - phai join() hoac thenCompose

Quy tắc: next operation async (trả CompletableFuture) → thenCompose. Transform sync → thenApply.

Combine nhiều future

CompletableFuture<User> userFut = CompletableFuture.supplyAsync(() -> fetchUser(1), exec);
CompletableFuture<List<Order>> ordersFut = CompletableFuture.supplyAsync(() -> fetchOrders(1), exec);

CompletableFuture<String> combined = userFut.thenCombine(ordersFut,
    (user, orders) -> user.getName() + " has " + orders.size() + " orders"
);

System.out.println(combined.join());

2 future chạy song song. thenCombine trigger khi cả 2 xong, nhận cả 2 kết quả. Tổng walltime = max(userFut, ordersFut), không phải sum.

allOfanyOf

List<CompletableFuture<Data>> futures = urls.stream()
    .map(url -> CompletableFuture.supplyAsync(() -> httpGet(url), exec))
    .toList();

// Cho tat ca xong
CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(CompletableFuture[]::new)
);

List<Data> results = all.thenApply(v ->
    futures.stream().map(CompletableFuture::join).toList()
).join();

allOf(CompletableFuture...) trả CompletableFuture<Void> — complete khi tất cả future input complete (success hoặc fail). Lưu ý: không tự gom kết quả — phải .join() từng cái lấy value.

anyOf(CompletableFuture...) — complete khi future đầu tiên xong. Dùng cho "lấy kết quả nhanh nhất trong nhiều replica".

Exception handling

CompletableFuture<Integer> cf = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() < 0.5) throw new RuntimeException("boom");
        return 42;
    })
    .exceptionally(ex -> {
        System.err.println("Error: " + ex.getMessage());
        return -1;   // fallback value
    });

System.out.println(cf.join());   // 42 hoac -1

exceptionally(Function<Throwable, T>) — catch exception, trả fallback value. Chain tiếp tục bình thường với fallback.

Alternative: handle((result, exception) -> ...) — nhận cả 2 (result null nếu có exception, exception null nếu success):

cf.handle((result, ex) -> {
    if (ex != null) return -1;
    return result;
});

Timeout (Java 9+)

CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> slowHttp())
    .orTimeout(500, TimeUnit.MILLISECONDS)
    .exceptionally(ex -> "fallback");

orTimeout(timeout, unit) — sau timeout, future complete exceptionally với TimeoutException. Kết hợp với exceptionally cho fallback.

Alternative: completeOnTimeout(value, timeout, unit) — thay vì throw, complete với value cho trước.

5. Pattern thực tế — aggregate 3 API song song

Bài toán: cho user ID, lấy profile + posts + recommendations → combine thành view. Mỗi API gọi ~200-400ms.

Sequential (chậm):

User profile = fetchProfile(id);            // 200ms
List<Post> posts = fetchPosts(id);           // 300ms
List<Item> recs = fetchRecommendations(id);  // 400ms
View view = new View(profile, posts, recs);
// Tong: 200+300+400 = 900ms

Parallel với CompletableFuture:

ExecutorService exec = Executors.newFixedThreadPool(10);

CompletableFuture<User> fp = CompletableFuture.supplyAsync(() -> fetchProfile(id), exec);
CompletableFuture<List<Post>> fo = CompletableFuture.supplyAsync(() -> fetchPosts(id), exec);
CompletableFuture<List<Item>> fr = CompletableFuture.supplyAsync(() -> fetchRecommendations(id), exec);

CompletableFuture<View> view = CompletableFuture.allOf(fp, fo, fr)
    .thenApply(v -> new View(fp.join(), fo.join(), fr.join()));
// Tong: max(200, 300, 400) = 400ms

Từ 900ms xuống 400ms — 2.25× nhanh hơn mà không tăng CPU. Đây là pattern chính của microservice aggregation: backend gọi nhiều service downstream, combine response, trả cho client.

6. Callable<T> vs Runnable

executor.submit nhận 2 kiểu task:

Runnable r = () -> doWork();                           // void run(), khong throw checked
Callable<Integer> c = () -> { return 42; };            // T call() throws Exception

Khác biệt:

  • Runnable: không return, không throw checked exception.
  • Callable: return T, throw Exception.

submit(Runnable) trả Future<?> (value null khi done). submit(Callable<T>) trả Future<T>.

Với CompletableFuture.supplyAsync, input là Supplier<T> (không throw checked) — nếu task có checked exception phải wrap RuntimeException:

CompletableFuture.supplyAsync(() -> {
    try {
        return mayThrowChecked();
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}, exec);

Hoặc dùng library như @SneakyThrows (Lombok). Nhưng thường wrap rõ ràng đọc tốt hơn.

7. Shutdown đúng cách

Shutdown graceful

exec.shutdown();   // Khong nhan task moi

try {
    if (!exec.awaitTermination(30, TimeUnit.SECONDS)) {
        // Timeout - force stop
        exec.shutdownNow();
    }
} catch (InterruptedException e) {
    exec.shutdownNow();
    Thread.currentThread().interrupt();
}
  • shutdown(): pool không nhận task mới. Task đang chạy + task trong queue hoàn thành bình thường.
  • awaitTermination(timeout): chờ tối đa timeout cho mọi task xong. Trả true nếu xong, false nếu timeout.
  • shutdownNow(): interrupt mọi thread đang chạy (task nào respect interrupt sẽ dừng). Trả list task chưa chạy.

Java 19+ try-with-resources

Java 19 thêm close() cho ExecutorServiceshutdown() + awaitTermination(Long.MAX_VALUE):

try (ExecutorService exec = Executors.newFixedThreadPool(4)) {
    exec.submit(task1);
    exec.submit(task2);
}   // Auto close = shutdown + await (vo han)

Tiện cho code scope ngắn. Warning: close() chờ vô hạn — nếu task bị stuck, thread thoát block mãi.

8. Pitfall tổng hợp

Nhầm 1: Quên shutdown() — JVM không thoát.

ExecutorService exec = Executors.newFixedThreadPool(4);
exec.submit(task);
// main() ket thuc nhung JVM van chay vi pool thread la non-daemon

✅ Try-with-resources (Java 19+) hoặc shutdown() cuối. Hoặc set thread factory dùng daemon.

Nhầm 2: future.get() không timeout.

Integer r = future.get();   // Hang forever neu task stuck

✅ Timeout: future.get(5, TimeUnit.SECONDS).

Nhầm 3: supplyAsync không pass executor — dùng common pool.

CompletableFuture.supplyAsync(() -> httpGet(url));
// Chay tren ForkJoinPool.commonPool
// Block I/O -> block pool shared toan JVM

✅ Pass executor: supplyAsync(task, myExecutor).

Nhầm 4: Block trong thenApply.

cf.thenApply(x -> httpGet(url));   // Block worker thread cua pool

thenCompose(x -> CompletableFuture.supplyAsync(() -> httpGet(url), exec)).

Nhầm 5: Không handle exception.

cf.thenApply(x -> risky(x));
// Ex silent nuot vao future
// join() sau do moi nem CompletionException

.exceptionally(ex -> fallback) hoặc .handle((r, ex) -> ...) ở cuối chain.

Nhầm 6: thenApply cho next async op (cần thenCompose).

cf.thenApply(user -> fetchOrdersAsync(user));
// Ket qua: CompletableFuture<CompletableFuture<List<Order>>>

cf.thenCompose(user -> fetchOrdersAsync(user)).

9. 📚 Deep Dive Oracle

📚 Deep Dive Oracle

Spec / reference chính thức:

Ghi chú: Javadoc ThreadPoolExecutor giải thích chi tiết interaction giữa corePoolSize, maxPoolSize, và queue. Quan trọng nhất: với LinkedBlockingQueue unbounded (default của newFixedThreadPool), maxPoolSize không bao giờ dùng — vì queue không bao giờ đầy. Muốn pool grow phải dùng bounded queue. Đây là lý do production thường tự config ThreadPoolExecutor.

10. Tóm tắt

  • ExecutorService thay new Thread() production: pool tái dùng, có limit, hỗ trợ Future.
  • 4 factory Executors: newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor, newScheduledThreadPool.
  • Production nên tự config ThreadPoolExecutor với bounded queue + rejection policy — tránh OOM với traffic burst.
  • Future.get() block; future.get(timeout) an toàn hơn với I/O không chắc.
  • Future không compose, không chain — hạn chế API từ Java 5.
  • CompletableFuture (Java 8) fix Future: thenApply (map), thenCompose (flatMap), thenCombine (2 future), allOf/anyOf.
  • orTimeout (Java 9+) set timeout per-future; exceptionally / handle catch exception.
  • Không block I/O trong common pool — luôn pass executor riêng cho I/O task.
  • Shutdown graceful: shutdown()awaitTermination(timeout)shutdownNow() nếu timeout.
  • Java 19+: ExecutorService try-with-resources — auto close khi exit block.

11. Tự kiểm tra

Tự kiểm tra
Q1
Khác biệt cụ thể giữa thenApplythenCompose?
  • thenApply(Function<T, R>): function trả R (value thường, sync). Tương đương map trong Stream.
  • thenCompose(Function<T, CompletableFuture<R>>): function trả CompletableFuture<R> (async op tiếp). Tương đương flatMap.

Trong chain có bước gọi async tiếp (fetch user → fetch order của user), dùng thenCompose:

cf.thenCompose(user ->
  CompletableFuture.supplyAsync(() -> fetchOrders(user.id()))
);

Nếu dùng thenApply sai ở đây → CompletableFuture<CompletableFuture<List>> nested. Phải .thenCompose(Function.identity()) hoặc .join() để flatten — verbose.

Quy tắc: function có trả CompletableFuture không → có thì compose, không thì apply.

Q2
Vì sao không nên dùng ForkJoinPool.commonPool cho I/O?

commonPool shared toàn JVM — parallel stream, CompletableFuture.supplyAsync không pass executor, ForkJoin task trực tiếp đều dùng chung. Default size = số CPU core.

Nếu block I/O trên commonPool: thread giữ lâu (500ms cho HTTP, vài giây cho DB) → parallel stream khác chờ → app overall chậm lại. Tệ hơn: đếm lầm số task parallelism — benchmark nghĩ có 8 task song song nhưng thực tế 4 vì commonPool bị chiếm.

Quy tắc:

  • I/O workload: executor riêng, thường newFixedThreadPool(50-200) hoặc virtual threads Java 21.
  • CPU-bound workload: commonPool OK (thiết kế cho CPU).

Test nhanh: khi dùng CompletableFuture với I/O, luôn pass executor: supplyAsync(task, exec). Không pass → bug tiềm ẩn.

Q3
Đoạn sau output gì? CompletableFuture.supplyAsync(() -> 5).thenApply(x -> x + 1).thenApply(x -> x * 2).thenAccept(System.out::println)

Chain:

  • supplyAsync(() -> 5) → complete với 5.
  • thenApply(x -> x + 1): 5 + 1 = 6.
  • thenApply(x -> x * 2): 6 * 2 = 12.
  • thenAccept(System.out::println): in 12, trả CompletableFuture<Void>.
12

Chain functional — giống Stream nhưng async. Mỗi stage có thể chạy trên thread khác (do executor quyết).

Lưu ý: nếu không có terminal (join, get, hoặc thenAccept/thenRun) + `main` return sớm, callback có thể chưa chạy kịp. Test đơn giản thường thêm .join() cuối để ensure complete.

Q4
Khi nào dùng newFixedThreadPool vs newCachedThreadPool?
  • newFixedThreadPool(n): biết parallelism cần trước. CPU-bound workload thường chọn n = số core. Pool size không thay đổi → latency predictable, không spike.
  • newCachedThreadPool: task ngắn, burst không đều. Pool grow/shrink theo demand.

Warning quan trọng: newCachedThreadPool không có upper limit — 10k request đồng thời → tạo 10k thread → OOM (mỗi thread 1MB stack).

Production thường dùng ThreadPoolExecutor tự cấu hình:

new ThreadPoolExecutor(
  10, 50,                              // core 10, max 50
  60, TimeUnit.SECONDS,
  new ArrayBlockingQueue<>(1000),      // bounded queue
  new ThreadPoolExecutor.CallerRunsPolicy()
);

Bounded queue + rejection policy = back-pressure. Burst traffic không làm crash instance.

Q5
Khác biệt giữa shutdown()shutdownNow()?
  • shutdown(): executor không nhận task mới. Task đang chạy hoàn thành bình thường. Task trong queue chạy hết. Graceful, không interrupt.
  • shutdownNow(): không nhận task mới, interrupt mọi thread đang chạy (task nào respect interrupt sẽ stop), bỏ task trong queue. Trả list task chưa chạy (caller có thể save/retry sau).

Pattern chuẩn graceful shutdown:

exec.shutdown();
try {
  if (!exec.awaitTermination(30, TimeUnit.SECONDS)) {
      exec.shutdownNow();   // Timeout -> force stop
  }
} catch (InterruptedException e) {
  exec.shutdownNow();
  Thread.currentThread().interrupt();
}

Đợi 30s cho task xong gracefully. Nếu task stuck, force stop. Đảm bảo không leak thread ngay cả khi task không respect interrupt.

Q6
Trong aggregate pattern (fetch 3 API song song), dùng CompletableFuture.allOf hay thenCombine?

Cả hai đều được. Tuỳ số lượng:

  • thenCombine(other, combiner): chính xác 2 future. API clean, type-safe — combiner nhận 2 kiểu cụ thể.
  • allOf(futures...): N future. Trả CompletableFuture<Void> — không tự gom kết quả, phải .join() từng future trong callback.

Ví dụ 3 future với allOf:

CompletableFuture<User> fp = ...;
CompletableFuture<List<Post>> fo = ...;
CompletableFuture<List<Item>> fr = ...;

CompletableFuture<View> view = CompletableFuture.allOf(fp, fo, fr)
  .thenApply(v -> new View(fp.join(), fo.join(), fr.join()));

Với 2 future: thenCombine elegant hơn, không cần join() thủ công.

Với N dynamic: allOf + collect kết quả từ list future. Tổng walltime = max(futures), song song thật.

Bài tiếp theo: Concurrent collections — ConcurrentHashMap, CopyOnWriteArrayList

Bài này có giúp bạn hiểu bản chất không?