Java Internals & Concurrency/Mini-challenge: Concurrent price aggregator
10/26
Bài 10 / 26~30 phútConcurrency cơ bảnMiễn phí lượt xem

Mini-challenge: Concurrent price aggregator

Bài thực hành khép lại Module 10 — gọi 5 'API' song song với CompletableFuture, timeout, fallback, aggregate kết quả. Mô phỏng backend thực tế.

Mini-challenge khép lại Module 10. Bạn sẽ viết module aggregate giá từ 5 "API" (mô phỏng bằng sleep + random). Đây là pattern phổ biến backend: so sánh giá flight/hotel/stock từ nhiều provider, merge kết quả cho user.

Yêu cầu: tất cả call song song, có timeout, có fallback khi API fail — không sequential, không await từng cái.

🎯 Đề bài

1. PriceProvider interface

interface PriceProvider {
    String name();
    double fetchPrice(String product) throws Exception;
}

5 implementation fake (ProviderA..E) — mỗi provider sleep(100..800ms) random rồi trả double. Thỉnh thoảng throw (simulate fail). Helper có sẵn trong starter.

2. PriceAggregator.fetchAll(String product) — song song

  • Gọi tất cả 5 provider song song với CompletableFuture.
  • Mỗi call timeout 500ms. Quá timeout → coi như fail, skip provider đó.
  • Provider throw → skip, không crash toàn aggregate.
  • Trả Map<String, Double> — provider name → price. Key nào fail/timeout không có trong map.

Signature:

Map<String, Double> fetchAll(String product);

3. PriceAggregator.bestPrice(String product) — giá thấp nhất

  • Dùng fetchAll, trả provider + price thấp nhất.
  • Trả Optional<Map.Entry<String, Double>> — empty nếu tất cả fail.

4. PriceAggregator.shutdown()

Shutdown executor graceful.

Yêu cầu performance: fetchAll trong ~500ms (max timeout), không phải 5 * max (sequential).

📦 Concept dùng trong bài

ConceptBàiDùng ở đây
ExecutorService10.3Pool gọi provider
CompletableFuture10.3Song song, timeout, fallback
orTimeout, exceptionally10.3Timeout + fallback chain
allOf + join10.3Chờ tất cả xong
Concurrent map10.4Thu thập kết quả thread-safe
Optional9.5bestPrice rỗng → empty
Stream min/max9.3Tìm min price

▶️ Starter code

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class PriceAggregatorApp {

    public interface PriceProvider {
        String name();
        double fetchPrice(String product) throws Exception;
    }

    public static class FakeProvider implements PriceProvider {
        private final String name;
        private final int minMs, maxMs;
        private final double failRate;
        private final Random rnd = new Random();

        public FakeProvider(String name, int minMs, int maxMs, double failRate) {
            this.name = name;
            this.minMs = minMs;
            this.maxMs = maxMs;
            this.failRate = failRate;
        }

        @Override public String name() { return name; }

        @Override public double fetchPrice(String product) throws Exception {
            int delay = minMs + rnd.nextInt(maxMs - minMs);
            Thread.sleep(delay);
            if (rnd.nextDouble() < failRate) {
                throw new RuntimeException(name + " failed");
            }
            return 100 + rnd.nextDouble() * 50;
        }
    }

    public static class PriceAggregator {
        private final List<PriceProvider> providers;
        private final ExecutorService exec;

        public PriceAggregator(List<PriceProvider> providers) {
            this.providers = providers;
            this.exec = Executors.newFixedThreadPool(providers.size());
        }

        // TODO: implement
        public Map<String, Double> fetchAll(String product) {
            return Map.of();
        }

        // TODO: implement
        public Optional<Map.Entry<String, Double>> bestPrice(String product) {
            return Optional.empty();
        }

        public void shutdown() {
            exec.shutdown();
            try {
                if (!exec.awaitTermination(2, TimeUnit.SECONDS)) {
                    exec.shutdownNow();
                }
            } catch (InterruptedException e) {
                exec.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        List<PriceProvider> providers = List.of(
            new FakeProvider("Amazon",  100, 300, 0.1),
            new FakeProvider("eBay",    200, 400, 0.1),
            new FakeProvider("Walmart", 150, 700, 0.2),   // Co the timeout
            new FakeProvider("Alibaba", 300, 900, 0.3),   // Hay timeout/fail
            new FakeProvider("Shopify", 100, 250, 0.0)
        );

        PriceAggregator agg = new PriceAggregator(providers);
        try {
            long t1 = System.currentTimeMillis();
            Map<String, Double> all = agg.fetchAll("iphone");
            long t2 = System.currentTimeMillis();

            System.out.printf("Fetched in %d ms%n", t2 - t1);
            all.forEach((k, v) -> System.out.printf("  %-10s %.2f%n", k, v));

            agg.bestPrice("iphone").ifPresent(e ->
                System.out.printf("Best: %s at %.2f%n", e.getKey(), e.getValue()));
        } finally {
            agg.shutdown();
        }
    }
}
javac PriceAggregatorApp.java
java PriceAggregatorApp

Dành 25–30 phút.

💡 Gợi ý

💡 Gợi ý — đọc khi bị kẹt

fetchAll với CompletableFuture + timeout + exceptionally:

public Map<String, Double> fetchAll(String product) {
    ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();

    List<CompletableFuture<Void>> futures = providers.stream()
        .map(p -> CompletableFuture
            .supplyAsync(() -> {
                try {
                    return p.fetchPrice(product);
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, exec)
            .orTimeout(500, TimeUnit.MILLISECONDS)
            .thenAccept(price -> result.put(p.name(), price))
            .exceptionally(ex -> {
                // Timeout hoac provider throw -> skip
                return null;
            }))
        .toList();

    CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
    return Map.copyOf(result);
}

Cơ chế:

  • supplyAsync(..., exec) — chạy trên executor riêng.
  • orTimeout(500, MILLISECONDS) — sau 500ms, complete với TimeoutException.
  • thenAccept(...) — khi thành công, thêm vào map. Dùng ConcurrentHashMap vì nhiều thread cùng put.
  • exceptionally(ex -> null) — swallow exception, trả null cho CompletableFuture<Void> — skip provider.
  • allOf(...).join() — chờ tất cả future complete (success hoặc fail).

bestPrice:

public Optional<Map.Entry<String, Double>> bestPrice(String product) {
    return fetchAll(product).entrySet().stream()
        .min(Map.Entry.comparingByValue());
}

Stream.min trả Optional — empty khi map rỗng.

✅ Lời giải

✅ Lời giải — xem sau khi đã thử
import java.util.*;
import java.util.concurrent.*;

public class PriceAggregatorApp {

    public interface PriceProvider {
        String name();
        double fetchPrice(String product) throws Exception;
    }

    public static class FakeProvider implements PriceProvider {
        private final String name;
        private final int minMs, maxMs;
        private final double failRate;
        private final Random rnd = new Random();

        public FakeProvider(String name, int minMs, int maxMs, double failRate) {
            this.name = name;
            this.minMs = minMs;
            this.maxMs = maxMs;
            this.failRate = failRate;
        }

        @Override public String name() { return name; }

        @Override public double fetchPrice(String product) throws Exception {
            int delay = minMs + rnd.nextInt(maxMs - minMs);
            Thread.sleep(delay);
            if (rnd.nextDouble() < failRate) {
                throw new RuntimeException(name + " failed");
            }
            return 100 + rnd.nextDouble() * 50;
        }
    }

    public static class PriceAggregator {
        private final List<PriceProvider> providers;
        private final ExecutorService exec;

        public PriceAggregator(List<PriceProvider> providers) {
            this.providers = providers;
            this.exec = Executors.newFixedThreadPool(providers.size());
        }

        public Map<String, Double> fetchAll(String product) {
            ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();

            List<CompletableFuture<Void>> futures = providers.stream()
                .map(p -> CompletableFuture
                    .supplyAsync(() -> {
                        try {
                            return p.fetchPrice(product);
                        } catch (Exception e) {
                            throw new CompletionException(e);
                        }
                    }, exec)
                    .orTimeout(500, TimeUnit.MILLISECONDS)
                    .thenAccept(price -> result.put(p.name(), price))
                    .exceptionally(ex -> null))
                .toList();

            CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
            return Map.copyOf(result);
        }

        public Optional<Map.Entry<String, Double>> bestPrice(String product) {
            return fetchAll(product).entrySet().stream()
                .min(Map.Entry.comparingByValue());
        }

        public void shutdown() {
            exec.shutdown();
            try {
                if (!exec.awaitTermination(2, TimeUnit.SECONDS)) {
                    exec.shutdownNow();
                }
            } catch (InterruptedException e) {
                exec.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        List<PriceProvider> providers = List.of(
            new FakeProvider("Amazon",  100, 300, 0.1),
            new FakeProvider("eBay",    200, 400, 0.1),
            new FakeProvider("Walmart", 150, 700, 0.2),
            new FakeProvider("Alibaba", 300, 900, 0.3),
            new FakeProvider("Shopify", 100, 250, 0.0)
        );

        PriceAggregator agg = new PriceAggregator(providers);
        try {
            long t1 = System.currentTimeMillis();
            Map<String, Double> all = agg.fetchAll("iphone");
            long t2 = System.currentTimeMillis();

            System.out.printf("Fetched in %d ms%n", t2 - t1);
            all.forEach((k, v) -> System.out.printf("  %-10s %.2f%n", k, v));

            agg.bestPrice("iphone").ifPresent(e ->
                System.out.printf("Best: %s at %.2f%n", e.getKey(), e.getValue()));
        } finally {
            agg.shutdown();
        }
    }
}

Điểm chính:

  • supplyAsync(..., exec) pass executor: nếu không pass, chạy trên ForkJoinPool.commonPool() — block I/O sẽ ảnh hưởng parallel stream toàn JVM. Executor riêng an toàn.

  • Wrap checked exception: p.fetchPrice throws Exception checked. Lambda không cho throws — wrap bằng CompletionException để CompletableFuture nhận.

  • orTimeout(500, MILLISECONDS) (Java 9+): sau timeout, future complete exceptionally với TimeoutException. Không dùng get(timeout)get block caller, mất tính async.

  • thenAccept success, exceptionally catch-all: thenAccept chạy khi upstream success; nếu upstream throw, exceptionally bắt lại, trả fallback. Chain ra CompletableFuture<Void> — unified type cho allOf.

  • ConcurrentHashMap cho result: nhiều thread thenAccept cùng lúc — HashMap sẽ corrupt.

  • Map.copyOf(result): trả immutable snapshot — caller không modify được ngẫu nhiên.

  • bestPrice = stream.min + Optional: clean, không null check.

  • Thời gian thực tế: max(timeout, max(provider delay)) = ~500ms — bất kể 5 hay 50 provider, vì tất cả song song. Sequential version sẽ là sum(delays) = ~2-3s.

🎓 Mở rộng

Mức 1 — Virtual thread (Java 21):

Thay executor:

this.exec = Executors.newVirtualThreadPerTaskExecutor();

Giờ scale với hàng nghìn provider — mỗi provider 1 virtual thread, không pool limit.

Mức 2 — Retry on failure:

Wrap supplyAsync với retry 2 lần nếu fail:

private <T> CompletableFuture<T> withRetry(Supplier<T> task, int attempts) {
    return CompletableFuture.supplyAsync(task, exec)
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(ex -> attempts > 1
            ? withRetry(task, attempts - 1)
            : CompletableFuture.failedFuture(ex))
        .thenCompose(f -> f);
}

Mức 3 — First-N-only (don't wait for slow ones):

Thay allOf bằng return khi có 3 kết quả đầu:

public Map<String, Double> fetchFirstN(String product, int n) {
    ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();
    CountDownLatch latch = new CountDownLatch(n);

    providers.forEach(p -> CompletableFuture
        .supplyAsync(() -> { try { return p.fetchPrice(product); }
                            catch (Exception e) { throw new CompletionException(e); } }, exec)
        .orTimeout(500, TimeUnit.MILLISECONDS)
        .thenAccept(price -> {
            result.put(p.name(), price);
            latch.countDown();
        })
        .exceptionally(ex -> null));

    try {
        latch.await(500, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return Map.copyOf(result);
}

Pattern "fastest N wins" — trả về khi có đủ N response, skip slow.

Mức 4 — Circuit breaker:

Track fail rate mỗi provider — provider fail > 50% → skip hoàn toàn 1 phút:

private final ConcurrentHashMap<String, CircuitState> breakers = new ConcurrentHashMap<>();

record CircuitState(int failCount, int totalCount, long openUntil) { }

Cấu trúc thực: Netflix Hystrix, Resilience4j — production pattern.

✨ Điều bạn vừa làm được

Hoàn thành mini-challenge này, bạn đã:

  • Gọi nhiều API song song với CompletableFuture — 500ms thay vì 5*500ms sequential.
  • Áp timeout per-call với orTimeout — không stuck thread vì 1 provider chậm.
  • Handle exception gracefully với exceptionally — 1 provider fail không crash toàn aggregate.
  • Dùng ConcurrentHashMap cho kết quả thread-safe — tránh corruption khi nhiều thenAccept cùng ghi.
  • Shutdown executor graceful — không leak thread.
  • Áp pattern resilience thực tế — foundation của microservice call aggregation.

Chúc mừng — bạn đã hoàn thành Module 10! Bạn giờ hiểu thread, synchronized, executor, concurrent collection, và virtual thread — toolkit đầy đủ để viết backend concurrent. Module 11 sẽ bước sang I/O & NIO — đọc/ghi file, socket, và API modern của Java 8+.

Bài này có giúp bạn hiểu bản chất không?

Hỏi đáp về bài này

Chưa có câu hỏi

Đặt câu hỏi

Có gì chưa rõ trong bài? Đặt câu hỏi đầu tiên — câu trả lời từ cộng đồng giúp bạn (và người sau).

Đặt câu hỏi đầu tiên