Mini-challenge: Concurrent price aggregator
Bài thực hành khép lại Module 10 — gọi 5 'API' song song với CompletableFuture, timeout, fallback, aggregate kết quả. Mô phỏng backend thực tế.
Mini-challenge khép lại Module 10. Bạn sẽ viết module aggregate giá từ 5 "API" (mô phỏng bằng sleep + random). Đây là pattern phổ biến backend: so sánh giá flight/hotel/stock từ nhiều provider, merge kết quả cho user.
Yêu cầu: tất cả call song song, có timeout, có fallback khi API fail — không sequential, không await từng cái.
🎯 Đề bài
1. PriceProvider interface
interface PriceProvider {
String name();
double fetchPrice(String product) throws Exception;
}
5 implementation fake (ProviderA..E) — mỗi provider sleep(100..800ms) random rồi trả double. Thỉnh thoảng throw (simulate fail). Helper có sẵn trong starter.
2. PriceAggregator.fetchAll(String product) — song song
- Gọi tất cả 5 provider song song với
CompletableFuture. - Mỗi call timeout 500ms. Quá timeout → coi như fail, skip provider đó.
- Provider throw → skip, không crash toàn aggregate.
- Trả
Map<String, Double>— provider name → price. Key nào fail/timeout không có trong map.
Signature:
Map<String, Double> fetchAll(String product);
3. PriceAggregator.bestPrice(String product) — giá thấp nhất
- Dùng
fetchAll, trả provider + price thấp nhất. - Trả
Optional<Map.Entry<String, Double>>— empty nếu tất cả fail.
4. PriceAggregator.shutdown()
Shutdown executor graceful.
Yêu cầu performance: fetchAll trong ~500ms (max timeout), không phải 5 * max (sequential).
📦 Concept dùng trong bài
| Concept | Bài | Dùng ở đây |
|---|---|---|
| ExecutorService | 10.3 | Pool gọi provider |
| CompletableFuture | 10.3 | Song song, timeout, fallback |
| orTimeout, exceptionally | 10.3 | Timeout + fallback chain |
| allOf + join | 10.3 | Chờ tất cả xong |
| Concurrent map | 10.4 | Thu thập kết quả thread-safe |
| Optional | 9.5 | bestPrice rỗng → empty |
| Stream min/max | 9.3 | Tìm min price |
▶️ Starter code
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class PriceAggregatorApp {
public interface PriceProvider {
String name();
double fetchPrice(String product) throws Exception;
}
public static class FakeProvider implements PriceProvider {
private final String name;
private final int minMs, maxMs;
private final double failRate;
private final Random rnd = new Random();
public FakeProvider(String name, int minMs, int maxMs, double failRate) {
this.name = name;
this.minMs = minMs;
this.maxMs = maxMs;
this.failRate = failRate;
}
@Override public String name() { return name; }
@Override public double fetchPrice(String product) throws Exception {
int delay = minMs + rnd.nextInt(maxMs - minMs);
Thread.sleep(delay);
if (rnd.nextDouble() < failRate) {
throw new RuntimeException(name + " failed");
}
return 100 + rnd.nextDouble() * 50;
}
}
public static class PriceAggregator {
private final List<PriceProvider> providers;
private final ExecutorService exec;
public PriceAggregator(List<PriceProvider> providers) {
this.providers = providers;
this.exec = Executors.newFixedThreadPool(providers.size());
}
// TODO: implement
public Map<String, Double> fetchAll(String product) {
return Map.of();
}
// TODO: implement
public Optional<Map.Entry<String, Double>> bestPrice(String product) {
return Optional.empty();
}
public void shutdown() {
exec.shutdown();
try {
if (!exec.awaitTermination(2, TimeUnit.SECONDS)) {
exec.shutdownNow();
}
} catch (InterruptedException e) {
exec.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
List<PriceProvider> providers = List.of(
new FakeProvider("Amazon", 100, 300, 0.1),
new FakeProvider("eBay", 200, 400, 0.1),
new FakeProvider("Walmart", 150, 700, 0.2), // Co the timeout
new FakeProvider("Alibaba", 300, 900, 0.3), // Hay timeout/fail
new FakeProvider("Shopify", 100, 250, 0.0)
);
PriceAggregator agg = new PriceAggregator(providers);
try {
long t1 = System.currentTimeMillis();
Map<String, Double> all = agg.fetchAll("iphone");
long t2 = System.currentTimeMillis();
System.out.printf("Fetched in %d ms%n", t2 - t1);
all.forEach((k, v) -> System.out.printf(" %-10s %.2f%n", k, v));
agg.bestPrice("iphone").ifPresent(e ->
System.out.printf("Best: %s at %.2f%n", e.getKey(), e.getValue()));
} finally {
agg.shutdown();
}
}
}
javac PriceAggregatorApp.java
java PriceAggregatorApp
Dành 25–30 phút.
💡 Gợi ý
fetchAll với CompletableFuture + timeout + exceptionally:
public Map<String, Double> fetchAll(String product) {
ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = providers.stream()
.map(p -> CompletableFuture
.supplyAsync(() -> {
try {
return p.fetchPrice(product);
} catch (Exception e) {
throw new CompletionException(e);
}
}, exec)
.orTimeout(500, TimeUnit.MILLISECONDS)
.thenAccept(price -> result.put(p.name(), price))
.exceptionally(ex -> {
// Timeout hoac provider throw -> skip
return null;
}))
.toList();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
return Map.copyOf(result);
}
Cơ chế:
supplyAsync(..., exec)— chạy trên executor riêng.orTimeout(500, MILLISECONDS)— sau 500ms, complete vớiTimeoutException.thenAccept(...)— khi thành công, thêm vào map. DùngConcurrentHashMapvì nhiều thread cùngput.exceptionally(ex -> null)— swallow exception, trảnullchoCompletableFuture<Void>— skip provider.allOf(...).join()— chờ tất cả future complete (success hoặc fail).
bestPrice:
public Optional<Map.Entry<String, Double>> bestPrice(String product) {
return fetchAll(product).entrySet().stream()
.min(Map.Entry.comparingByValue());
}
Stream.min trả Optional — empty khi map rỗng.
✅ Lời giải
import java.util.*;
import java.util.concurrent.*;
public class PriceAggregatorApp {
public interface PriceProvider {
String name();
double fetchPrice(String product) throws Exception;
}
public static class FakeProvider implements PriceProvider {
private final String name;
private final int minMs, maxMs;
private final double failRate;
private final Random rnd = new Random();
public FakeProvider(String name, int minMs, int maxMs, double failRate) {
this.name = name;
this.minMs = minMs;
this.maxMs = maxMs;
this.failRate = failRate;
}
@Override public String name() { return name; }
@Override public double fetchPrice(String product) throws Exception {
int delay = minMs + rnd.nextInt(maxMs - minMs);
Thread.sleep(delay);
if (rnd.nextDouble() < failRate) {
throw new RuntimeException(name + " failed");
}
return 100 + rnd.nextDouble() * 50;
}
}
public static class PriceAggregator {
private final List<PriceProvider> providers;
private final ExecutorService exec;
public PriceAggregator(List<PriceProvider> providers) {
this.providers = providers;
this.exec = Executors.newFixedThreadPool(providers.size());
}
public Map<String, Double> fetchAll(String product) {
ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = providers.stream()
.map(p -> CompletableFuture
.supplyAsync(() -> {
try {
return p.fetchPrice(product);
} catch (Exception e) {
throw new CompletionException(e);
}
}, exec)
.orTimeout(500, TimeUnit.MILLISECONDS)
.thenAccept(price -> result.put(p.name(), price))
.exceptionally(ex -> null))
.toList();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
return Map.copyOf(result);
}
public Optional<Map.Entry<String, Double>> bestPrice(String product) {
return fetchAll(product).entrySet().stream()
.min(Map.Entry.comparingByValue());
}
public void shutdown() {
exec.shutdown();
try {
if (!exec.awaitTermination(2, TimeUnit.SECONDS)) {
exec.shutdownNow();
}
} catch (InterruptedException e) {
exec.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
List<PriceProvider> providers = List.of(
new FakeProvider("Amazon", 100, 300, 0.1),
new FakeProvider("eBay", 200, 400, 0.1),
new FakeProvider("Walmart", 150, 700, 0.2),
new FakeProvider("Alibaba", 300, 900, 0.3),
new FakeProvider("Shopify", 100, 250, 0.0)
);
PriceAggregator agg = new PriceAggregator(providers);
try {
long t1 = System.currentTimeMillis();
Map<String, Double> all = agg.fetchAll("iphone");
long t2 = System.currentTimeMillis();
System.out.printf("Fetched in %d ms%n", t2 - t1);
all.forEach((k, v) -> System.out.printf(" %-10s %.2f%n", k, v));
agg.bestPrice("iphone").ifPresent(e ->
System.out.printf("Best: %s at %.2f%n", e.getKey(), e.getValue()));
} finally {
agg.shutdown();
}
}
}
Điểm chính:
-
supplyAsync(..., exec)pass executor: nếu không pass, chạy trênForkJoinPool.commonPool()— block I/O sẽ ảnh hưởng parallel stream toàn JVM. Executor riêng an toàn. -
Wrap checked exception:
p.fetchPricethrowsExceptionchecked. Lambda không cho throws — wrap bằngCompletionExceptionđểCompletableFuturenhận. -
orTimeout(500, MILLISECONDS)(Java 9+): sau timeout, future complete exceptionally vớiTimeoutException. Không dùngget(timeout)vìgetblock caller, mất tính async. -
thenAcceptsuccess,exceptionallycatch-all:thenAcceptchạy khi upstream success; nếu upstream throw,exceptionallybắt lại, trả fallback. Chain raCompletableFuture<Void>— unified type choallOf. -
ConcurrentHashMapcho result: nhiều threadthenAcceptcùng lúc —HashMapsẽ corrupt. -
Map.copyOf(result): trả immutable snapshot — caller không modify được ngẫu nhiên. -
bestPrice= stream.min + Optional: clean, không null check. -
Thời gian thực tế:
max(timeout, max(provider delay)) = ~500ms— bất kể 5 hay 50 provider, vì tất cả song song. Sequential version sẽ làsum(delays) = ~2-3s.
🎓 Mở rộng
Mức 1 — Virtual thread (Java 21):
Thay executor:
this.exec = Executors.newVirtualThreadPerTaskExecutor();
Giờ scale với hàng nghìn provider — mỗi provider 1 virtual thread, không pool limit.
Mức 2 — Retry on failure:
Wrap supplyAsync với retry 2 lần nếu fail:
private <T> CompletableFuture<T> withRetry(Supplier<T> task, int attempts) {
return CompletableFuture.supplyAsync(task, exec)
.thenApply(CompletableFuture::completedFuture)
.exceptionally(ex -> attempts > 1
? withRetry(task, attempts - 1)
: CompletableFuture.failedFuture(ex))
.thenCompose(f -> f);
}
Mức 3 — First-N-only (don't wait for slow ones):
Thay allOf bằng return khi có 3 kết quả đầu:
public Map<String, Double> fetchFirstN(String product, int n) {
ConcurrentHashMap<String, Double> result = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(n);
providers.forEach(p -> CompletableFuture
.supplyAsync(() -> { try { return p.fetchPrice(product); }
catch (Exception e) { throw new CompletionException(e); } }, exec)
.orTimeout(500, TimeUnit.MILLISECONDS)
.thenAccept(price -> {
result.put(p.name(), price);
latch.countDown();
})
.exceptionally(ex -> null));
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Map.copyOf(result);
}
Pattern "fastest N wins" — trả về khi có đủ N response, skip slow.
Mức 4 — Circuit breaker:
Track fail rate mỗi provider — provider fail > 50% → skip hoàn toàn 1 phút:
private final ConcurrentHashMap<String, CircuitState> breakers = new ConcurrentHashMap<>();
record CircuitState(int failCount, int totalCount, long openUntil) { }
Cấu trúc thực: Netflix Hystrix, Resilience4j — production pattern.
✨ Điều bạn vừa làm được
Hoàn thành mini-challenge này, bạn đã:
- Gọi nhiều API song song với
CompletableFuture— 500ms thay vì 5*500ms sequential. - Áp timeout per-call với
orTimeout— không stuck thread vì 1 provider chậm. - Handle exception gracefully với
exceptionally— 1 provider fail không crash toàn aggregate. - Dùng
ConcurrentHashMapcho kết quả thread-safe — tránh corruption khi nhiều thenAccept cùng ghi. - Shutdown executor graceful — không leak thread.
- Áp pattern resilience thực tế — foundation của microservice call aggregation.
Chúc mừng — bạn đã hoàn thành Module 10! Bạn giờ hiểu thread, synchronized, executor, concurrent collection, và virtual thread — toolkit đầy đủ để viết backend concurrent. Module 11 sẽ bước sang I/O & NIO — đọc/ghi file, socket, và API modern của Java 8+.
Bài này có giúp bạn hiểu bản chất không?