Java — Từ Zero đến Senior/Mini-challenge: Concurrent price aggregator
~30 phútConcurrency cơ bảnMiễn phí

Mini-challenge: Concurrent price aggregator

Bài thực hành khép lại Module 10 — gọi 5 'API' song song với CompletableFuture, timeout, fallback, aggregate kết quả. Mô phỏng backend thực tế.

Mini-challenge khép lại Module 10. Bạn sẽ viết module aggregate giá từ 5 "API" (mô phỏng bằng sleep + random). Đây là pattern phổ biến backend: so sánh giá flight/hotel/stock từ nhiều provider, merge kết quả cho user.

Yêu cầu: tất cả call song song, có timeout, có fallback khi API fail — không sequential, không await từng cái.

🎯 Đề bài

1. PriceProvider interface

interface PriceProvider {
    String name();
    double fetchPrice(String product) throws Exception;
}

5 implementation fake (ProviderA..E) — mỗi provider sleep(100..800ms) random rồi trả double. Thỉnh thoảng throw (simulate fail). Helper có sẵn trong starter.

2. PriceAggregator.fetchAll(String product) — song song

  • Gọi tất cả 5 provider song song với CompletableFuture.
  • Mỗi call timeout 500ms. Quá timeout → coi như fail, skip provider đó.
  • Provider throw → skip, không crash toàn aggregate.
  • Trả Map<String, Double> — provider name → price. Key nào fail/timeout không có trong map.

Signature:

Map<String, Double> fetchAll(String product);

3. PriceAggregator.bestPrice(String product) — giá thấp nhất

  • Dùng fetchAll, trả provider + price thấp nhất.
  • Trả Optional<Map.Entry<String, Double>> — empty nếu tất cả fail.

4. PriceAggregator.shutdown()

Shutdown executor graceful.

Yêu cầu performance: fetchAll trong ~500ms (max timeout), không phải 5 * max (sequential).

📦 Concept dùng trong bài

ConceptBàiDùng ở đây
ExecutorService10.3Pool gọi provider
CompletableFuture10.3Song song, timeout, fallback
orTimeout, exceptionally10.3Timeout + fallback chain
allOf + join10.3Chờ tất cả xong
Concurrent map10.4Thu thập kết quả thread-safe
Optional9.5bestPrice rỗng → empty
Stream min/max9.3Tìm min price

▶️ Starter code

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class PriceAggregatorApp {

    public interface PriceProvider {
        String name();
        double fetchPrice(String product) throws Exception;
    }

    public static class FakeProvider implements PriceProvider {
        private final String name;
        private final int minMs, maxMs;
        private final double failRate;
        private final Random rnd = new Random();

        public FakeProvider(String name, int minMs, int maxMs, double failRate) {
            this.name = name;
            this.minMs = minMs;
            this.maxMs = maxMs;
            this.failRate = failRate;
        }

        @Override public String name() { return name; }

        @Override public double fetchPrice(String product) throws Exception {
            int delay = minMs + rnd.nextInt(maxMs - minMs);
            Thread.sleep(delay);
            if (rnd.nextDouble() < failRate) {
                throw new RuntimeException(name + " failed");
            }
            return 100 + rnd.nextDouble() * 50;
        }
    }

    public static class PriceAggregator {
        private final List<PriceProvider> providers;
        private final ExecutorService exec;

        public PriceAggregator(List<PriceProvider> providers) {
            this.providers = providers;
            this.exec = Executors.newFixedThreadPool(providers.size());
        }

        // TODO: implement
        public Map<String, Double> fetchAll(String product) {
            return Map.of();
        }

        // TODO: implement
        public Optional<Map.Entry<String, Double>> bestPrice(String product) {
            return Optional.empty();
        }

        public void shutdown() {
            exec.shutdown();
            try {
                if (!exec.awaitTermination(2, TimeUnit.SECONDS)) {
                    exec.shutdownNow();
                }
            } catch (InterruptedException e) {
                exec.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        List<PriceProvider> providers = List.of(
            new FakeProvider("Amazon",  100, 300, 0.1),
            new FakeProvider("eBay",    200, 400, 0.1),
            new FakeProvider("Walmart", 150, 700, 0.2),   // Co the timeout
            new FakeProvider("Alibaba", 300, 900, 0.3),   // Hay timeout/fail
            new FakeProvider("Shopify", 100, 250, 0.0)
        );

        PriceAggregator agg = new PriceAggregator(providers);
        try {
            long t1 = System.currentTimeMillis();
            Map<String, Double> all = agg.fetchAll("iphone");
            long t2 = System.currentTimeMillis();

            System.out.printf("Fetched in %d ms%n", t2 - t1);
            all.forEach((k, v) -> System.out.printf("  %-10s %.2f%n", k, v));

            agg.bestPrice("iphone").ifPresent(e ->
                System.out.printf("Best: %s at %.2f%n", e.getKey(), e.getValue()));
        } finally {
            agg.shutdown();
        }
    }
}
javac PriceAggregatorApp.java
java PriceAggregatorApp

Dành 25–30 phút.

💡 Gợi ý

💡 Gợi ý — đọc khi bị kẹt

fetchAll với CompletableFuture + timeout + exceptionally:

public Map<String, Double> fetchAll(String product) {
    ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();

    List<CompletableFuture<Void>> futures = providers.stream()
        .map(p -> CompletableFuture
            .supplyAsync(() -> {
                try {
                    return p.fetchPrice(product);
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, exec)
            .orTimeout(500, TimeUnit.MILLISECONDS)
            .thenAccept(price -> result.put(p.name(), price))
            .exceptionally(ex -> {
                // Timeout hoac provider throw -> skip
                return null;
            }))
        .toList();

    CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
    return Map.copyOf(result);
}

Cơ chế:

  • supplyAsync(..., exec) — chạy trên executor riêng.
  • orTimeout(500, MILLISECONDS) — sau 500ms, complete với TimeoutException.
  • thenAccept(...) — khi thành công, thêm vào map. Dùng ConcurrentHashMap vì nhiều thread cùng put.
  • exceptionally(ex -> null) — swallow exception, trả null cho CompletableFuture<Void> — skip provider.
  • allOf(...).join() — chờ tất cả future complete (success hoặc fail).

bestPrice:

public Optional<Map.Entry<String, Double>> bestPrice(String product) {
    return fetchAll(product).entrySet().stream()
        .min(Map.Entry.comparingByValue());
}

Stream.min trả Optional — empty khi map rỗng.

✅ Lời giải

✅ Lời giải — xem sau khi đã thử
import java.util.*;
import java.util.concurrent.*;

public class PriceAggregatorApp {

    public interface PriceProvider {
        String name();
        double fetchPrice(String product) throws Exception;
    }

    public static class FakeProvider implements PriceProvider {
        private final String name;
        private final int minMs, maxMs;
        private final double failRate;
        private final Random rnd = new Random();

        public FakeProvider(String name, int minMs, int maxMs, double failRate) {
            this.name = name;
            this.minMs = minMs;
            this.maxMs = maxMs;
            this.failRate = failRate;
        }

        @Override public String name() { return name; }

        @Override public double fetchPrice(String product) throws Exception {
            int delay = minMs + rnd.nextInt(maxMs - minMs);
            Thread.sleep(delay);
            if (rnd.nextDouble() < failRate) {
                throw new RuntimeException(name + " failed");
            }
            return 100 + rnd.nextDouble() * 50;
        }
    }

    public static class PriceAggregator {
        private final List<PriceProvider> providers;
        private final ExecutorService exec;

        public PriceAggregator(List<PriceProvider> providers) {
            this.providers = providers;
            this.exec = Executors.newFixedThreadPool(providers.size());
        }

        public Map<String, Double> fetchAll(String product) {
            ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();

            List<CompletableFuture<Void>> futures = providers.stream()
                .map(p -> CompletableFuture
                    .supplyAsync(() -> {
                        try {
                            return p.fetchPrice(product);
                        } catch (Exception e) {
                            throw new CompletionException(e);
                        }
                    }, exec)
                    .orTimeout(500, TimeUnit.MILLISECONDS)
                    .thenAccept(price -> result.put(p.name(), price))
                    .exceptionally(ex -> null))
                .toList();

            CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
            return Map.copyOf(result);
        }

        public Optional<Map.Entry<String, Double>> bestPrice(String product) {
            return fetchAll(product).entrySet().stream()
                .min(Map.Entry.comparingByValue());
        }

        public void shutdown() {
            exec.shutdown();
            try {
                if (!exec.awaitTermination(2, TimeUnit.SECONDS)) {
                    exec.shutdownNow();
                }
            } catch (InterruptedException e) {
                exec.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        List<PriceProvider> providers = List.of(
            new FakeProvider("Amazon",  100, 300, 0.1),
            new FakeProvider("eBay",    200, 400, 0.1),
            new FakeProvider("Walmart", 150, 700, 0.2),
            new FakeProvider("Alibaba", 300, 900, 0.3),
            new FakeProvider("Shopify", 100, 250, 0.0)
        );

        PriceAggregator agg = new PriceAggregator(providers);
        try {
            long t1 = System.currentTimeMillis();
            Map<String, Double> all = agg.fetchAll("iphone");
            long t2 = System.currentTimeMillis();

            System.out.printf("Fetched in %d ms%n", t2 - t1);
            all.forEach((k, v) -> System.out.printf("  %-10s %.2f%n", k, v));

            agg.bestPrice("iphone").ifPresent(e ->
                System.out.printf("Best: %s at %.2f%n", e.getKey(), e.getValue()));
        } finally {
            agg.shutdown();
        }
    }
}

Điểm chính:

  • supplyAsync(..., exec) pass executor: nếu không pass, chạy trên ForkJoinPool.commonPool() — block I/O sẽ ảnh hưởng parallel stream toàn JVM. Executor riêng an toàn.

  • Wrap checked exception: p.fetchPrice throws Exception checked. Lambda không cho throws — wrap bằng CompletionException để CompletableFuture nhận.

  • orTimeout(500, MILLISECONDS) (Java 9+): sau timeout, future complete exceptionally với TimeoutException. Không dùng get(timeout)get block caller, mất tính async.

  • thenAccept success, exceptionally catch-all: thenAccept chạy khi upstream success; nếu upstream throw, exceptionally bắt lại, trả fallback. Chain ra CompletableFuture<Void> — unified type cho allOf.

  • ConcurrentHashMap cho result: nhiều thread thenAccept cùng lúc — HashMap sẽ corrupt.

  • Map.copyOf(result): trả immutable snapshot — caller không modify được ngẫu nhiên.

  • bestPrice = stream.min + Optional: clean, không null check.

  • Thời gian thực tế: max(timeout, max(provider delay)) = ~500ms — bất kể 5 hay 50 provider, vì tất cả song song. Sequential version sẽ là sum(delays) = ~2-3s.

🎓 Mở rộng

Mức 1 — Virtual thread (Java 21):

Thay executor:

this.exec = Executors.newVirtualThreadPerTaskExecutor();

Giờ scale với hàng nghìn provider — mỗi provider 1 virtual thread, không pool limit.

Mức 2 — Retry on failure:

Wrap supplyAsync với retry 2 lần nếu fail:

private <T> CompletableFuture<T> withRetry(Supplier<T> task, int attempts) {
    return CompletableFuture.supplyAsync(task, exec)
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(ex -> attempts > 1
            ? withRetry(task, attempts - 1)
            : CompletableFuture.failedFuture(ex))
        .thenCompose(f -> f);
}

Mức 3 — First-N-only (don't wait for slow ones):

Thay allOf bằng return khi có 3 kết quả đầu:

public Map<String, Double> fetchFirstN(String product, int n) {
    ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();
    CountDownLatch latch = new CountDownLatch(n);

    providers.forEach(p -> CompletableFuture
        .supplyAsync(() -> { try { return p.fetchPrice(product); }
                            catch (Exception e) { throw new CompletionException(e); } }, exec)
        .orTimeout(500, TimeUnit.MILLISECONDS)
        .thenAccept(price -> {
            result.put(p.name(), price);
            latch.countDown();
        })
        .exceptionally(ex -> null));

    try {
        latch.await(500, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return Map.copyOf(result);
}

Pattern "fastest N wins" — trả về khi có đủ N response, skip slow.

Mức 4 — Circuit breaker:

Track fail rate mỗi provider — provider fail > 50% → skip hoàn toàn 1 phút:

private final ConcurrentHashMap<String, CircuitState> breakers = new ConcurrentHashMap<>();

record CircuitState(int failCount, int totalCount, long openUntil) { }

Cấu trúc thực: Netflix Hystrix, Resilience4j — production pattern.

✨ Điều bạn vừa làm được

Hoàn thành mini-challenge này, bạn đã:

  • Gọi nhiều API song song với CompletableFuture — 500ms thay vì 5*500ms sequential.
  • Áp timeout per-call với orTimeout — không stuck thread vì 1 provider chậm.
  • Handle exception gracefully với exceptionally — 1 provider fail không crash toàn aggregate.
  • Dùng ConcurrentHashMap cho kết quả thread-safe — tránh corruption khi nhiều thenAccept cùng ghi.
  • Shutdown executor graceful — không leak thread.
  • Áp pattern resilience thực tế — foundation của microservice call aggregation.

Chúc mừng — bạn đã hoàn thành Module 10! Bạn giờ hiểu thread, synchronized, executor, concurrent collection, và virtual thread — toolkit đầy đủ để viết backend concurrent. Module 11 sẽ bước sang I/O & NIO — đọc/ghi file, socket, và API modern của Java 8+.

Bài này có giúp bạn hiểu bản chất không?