专注于分布式系统架构AI辅助开发工具(Claude
Code中文周刊)

跨数据中心?代码该怎么思考?看这篇就够了

目录

  1. 基础概念
  2. 核心设计模式
  3. 高级编程技巧
  4. 完整代码示例
  5. 最佳实践

基础概念

什么是多数据中心访问?

多数据中心访问是指应用程序需要与分布在不同地理位置的数据中心进行通信,例如:

  • 北京数据中心
  • 上海数据中心
  • 广州数据中心

核心挑战

  1. 网络延迟:不同区域的网络延迟不同
  2. 故障转移:某个数据中心故障时需要切换
  3. 负载均衡:合理分配请求到不同数据中心
  4. 数据一致性:跨区域数据同步问题

核心设计模式

1. 策略模式 (Strategy Pattern)

用途:根据不同情况选择不同的数据中心访问策略

// 策略接口
public interface DataCenterStrategy {
    String selectDataCenter(List<DataCenter> dataCenters, String userId);
}

// 就近访问策略
public class NearestDataCenterStrategy implements DataCenterStrategy {
    @Override
    public String selectDataCenter(List<DataCenter> dataCenters, String userId) {
        // 根据用户IP地址选择最近的数据中心
        String userRegion = getUserRegion(userId);
        return dataCenters.stream()
            .filter(dc -> dc.getRegion().equals(userRegion))
            .findFirst()
            .map(DataCenter::getId)
            .orElse(dataCenters.get(0).getId());
    }

    private String getUserRegion(String userId) {
        // 简化实现,实际应通过IP定位
        return "BEIJING";
    }
}

// 轮询策略
public class RoundRobinStrategy implements DataCenterStrategy {
    private AtomicInteger counter = new AtomicInteger(0);

    @Override
    public String selectDataCenter(List<DataCenter> dataCenters, String userId) {
        int index = counter.getAndIncrement() % dataCenters.size();
        return dataCenters.get(index).getId();
    }
}

// 最少连接策略
public class LeastConnectionStrategy implements DataCenterStrategy {
    @Override
    public String selectDataCenter(List<DataCenter> dataCenters, String userId) {
        return dataCenters.stream()
            .min(Comparator.comparingInt(DataCenter::getActiveConnections))
            .map(DataCenter::getId)
            .orElse(dataCenters.get(0).getId());
    }
}

2. 工厂模式 (Factory Pattern)

用途:创建不同数据中心的客户端连接

// 数据中心客户端接口
public interface DataCenterClient {
    Response executeQuery(String query);
    boolean isHealthy();
    String getRegion();
}

// 具体实现
public class BeijingDataCenterClient implements DataCenterClient {
    private final String endpoint;
    private final HttpClient httpClient;

    public BeijingDataCenterClient(String endpoint) {
        this.endpoint = endpoint;
        this.httpClient = HttpClient.newHttpClient();
    }

    @Override
    public Response executeQuery(String query) {
        try {
            HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(endpoint + "/api/query"))
                .POST(HttpRequest.BodyPublishers.ofString(query))
                .build();

            HttpResponse<String> response = httpClient.send(request, 
                HttpResponse.BodyHandlers.ofString());

            return new Response(response.statusCode(), response.body());
        } catch (Exception e) {
            throw new DataCenterException("Beijing DC query failed", e);
        }
    }

    @Override
    public boolean isHealthy() {
        // 健康检查逻辑
        return true;
    }

    @Override
    public String getRegion() {
        return "BEIJING";
    }
}

// 工厂类
public class DataCenterClientFactory {
    private static final Map<String, DataCenterClient> clients = new ConcurrentHashMap<>();

    public static DataCenterClient createClient(String region, String endpoint) {
        return clients.computeIfAbsent(region, k -> {
            switch (region.toUpperCase()) {
                case "BEIJING":
                    return new BeijingDataCenterClient(endpoint);
                case "SHANGHAI":
                    return new ShanghaiDataCenterClient(endpoint);
                case "GUANGZHOU":
                    return new GuangzhouDataCenterClient(endpoint);
                default:
                    throw new IllegalArgumentException("Unknown region: " + region);
            }
        });
    }
}

3. 代理模式 (Proxy Pattern)

用途:添加额外功能,如缓存、重试、监控

public class DataCenterClientProxy implements DataCenterClient {
    private final DataCenterClient realClient;
    private final Cache<String, Response> cache;
    private final MetricsCollector metrics;

    public DataCenterClientProxy(DataCenterClient realClient) {
        this.realClient = realClient;
        this.cache = CacheBuilder.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(5, TimeUnit.MINUTES)
            .build();
        this.metrics = new MetricsCollector();
    }

    @Override
    public Response executeQuery(String query) {
        // 1. 检查缓存
        Response cached = cache.getIfPresent(query);
        if (cached != null) {
            metrics.recordCacheHit();
            return cached;
        }

        // 2. 记录开始时间
        long startTime = System.currentTimeMillis();

        try {
            // 3. 执行实际查询
            Response response = realClient.executeQuery(query);

            // 4. 记录指标
            long duration = System.currentTimeMillis() - startTime;
            metrics.recordQuery(realClient.getRegion(), duration, true);

            // 5. 缓存结果
            cache.put(query, response);

            return response;
        } catch (Exception e) {
            // 6. 记录失败
            long duration = System.currentTimeMillis() - startTime;
            metrics.recordQuery(realClient.getRegion(), duration, false);
            throw e;
        }
    }

    @Override
    public boolean isHealthy() {
        return realClient.isHealthy();
    }

    @Override
    public String getRegion() {
        return realClient.getRegion();
    }
}

4. 责任链模式 (Chain of Responsibility)

用途:实现故障转移和降级处理

public abstract class DataCenterHandler {
    protected DataCenterHandler next;

    public void setNext(DataCenterHandler next) {
        this.next = next;
    }

    public abstract Response handle(Request request);
}

public class PrimaryDataCenterHandler extends DataCenterHandler {
    private final DataCenterClient primaryClient;

    public PrimaryDataCenterHandler(DataCenterClient primaryClient) {
        this.primaryClient = primaryClient;
    }

    @Override
    public Response handle(Request request) {
        try {
            if (primaryClient.isHealthy()) {
                return primaryClient.executeQuery(request.getQuery());
            }
        } catch (Exception e) {
            System.err.println("Primary DC failed: " + e.getMessage());
        }

        // 转发到下一个处理器
        if (next != null) {
            return next.handle(request);
        }

        throw new DataCenterException("All data centers failed");
    }
}

public class BackupDataCenterHandler extends DataCenterHandler {
    private final DataCenterClient backupClient;

    public BackupDataCenterHandler(DataCenterClient backupClient) {
        this.backupClient = backupClient;
    }

    @Override
    public Response handle(Request request) {
        try {
            if (backupClient.isHealthy()) {
                return backupClient.executeQuery(request.getQuery());
            }
        } catch (Exception e) {
            System.err.println("Backup DC failed: " + e.getMessage());
        }

        if (next != null) {
            return next.handle(request);
        }

        throw new DataCenterException("All data centers failed");
    }
}

5. 观察者模式 (Observer Pattern)

用途:监控数据中心状态变化

public interface DataCenterObserver {
    void onHealthStatusChanged(String region, boolean isHealthy);
    void onLatencyChanged(String region, long latency);
}

public class DataCenterMonitor {
    private final List<DataCenterObserver> observers = new CopyOnWriteArrayList<>();
    private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();

    public void addObserver(DataCenterObserver observer) {
        observers.add(observer);
    }

    public void removeObserver(DataCenterObserver observer) {
        observers.remove(observer);
    }

    public void updateHealthStatus(String region, boolean isHealthy) {
        Boolean oldStatus = healthStatus.put(region, isHealthy);

        if (oldStatus == null || oldStatus != isHealthy) {
            notifyHealthStatusChanged(region, isHealthy);
        }
    }

    private void notifyHealthStatusChanged(String region, boolean isHealthy) {
        for (DataCenterObserver observer : observers) {
            observer.onHealthStatusChanged(region, isHealthy);
        }
    }
}

// 具体观察者:告警系统
public class AlertingObserver implements DataCenterObserver {
    @Override
    public void onHealthStatusChanged(String region, boolean isHealthy) {
        if (!isHealthy) {
            sendAlert("Data center " + region + " is down!");
        }
    }

    @Override
    public void onLatencyChanged(String region, long latency) {
        if (latency > 1000) {
            sendAlert("High latency detected in " + region + ": " + latency + "ms");
        }
    }

    private void sendAlert(String message) {
        System.err.println("ALERT: " + message);
        // 实际发送告警邮件或短信
    }
}

高级编程技巧

1. 断路器模式 (Circuit Breaker)

防止级联故障,快速失败

public class CircuitBreaker {
    private enum State { CLOSED, OPEN, HALF_OPEN }

    private State state = State.CLOSED;
    private final int failureThreshold;
    private final long timeout;
    private int failureCount = 0;
    private long lastFailureTime = 0;

    public CircuitBreaker(int failureThreshold, long timeout) {
        this.failureThreshold = failureThreshold;
        this.timeout = timeout;
    }

    public <T> T execute(Supplier<T> action) {
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime >= timeout) {
                state = State.HALF_OPEN;
            } else {
                throw new CircuitBreakerOpenException("Circuit breaker is OPEN");
            }
        }

        try {
            T result = action.get();
            onSuccess();
            return result;
        } catch (Exception e) {
            onFailure();
            throw e;
        }
    }

    private synchronized void onSuccess() {
        failureCount = 0;
        state = State.CLOSED;
    }

    private synchronized void onFailure() {
        failureCount++;
        lastFailureTime = System.currentTimeMillis();

        if (failureCount >= failureThreshold) {
            state = State.OPEN;
        }
    }
}

// 使用断路器
public class ResilientDataCenterClient implements DataCenterClient {
    private final DataCenterClient delegate;
    private final CircuitBreaker circuitBreaker;

    public ResilientDataCenterClient(DataCenterClient delegate) {
        this.delegate = delegate;
        this.circuitBreaker = new CircuitBreaker(5, 30000); // 5次失败,30秒超时
    }

    @Override
    public Response executeQuery(String query) {
        return circuitBreaker.execute(() -> delegate.executeQuery(query));
    }

    @Override
    public boolean isHealthy() {
        return delegate.isHealthy();
    }

    @Override
    public String getRegion() {
        return delegate.getRegion();
    }
}

2. 重试机制 (Retry with Exponential Backoff)

public class RetryableDataCenterClient implements DataCenterClient {
    private final DataCenterClient delegate;
    private final int maxRetries;
    private final long initialDelayMs;

    public RetryableDataCenterClient(DataCenterClient delegate, int maxRetries) {
        this.delegate = delegate;
        this.maxRetries = maxRetries;
        this.initialDelayMs = 100;
    }

    @Override
    public Response executeQuery(String query) {
        int attempt = 0;
        Exception lastException = null;

        while (attempt < maxRetries) {
            try {
                return delegate.executeQuery(query);
            } catch (Exception e) {
                lastException = e;
                attempt++;

                if (attempt < maxRetries) {
                    long delay = initialDelayMs * (1L << attempt); // 指数退避
                    System.out.println("Retry attempt " + attempt + " after " + delay + "ms");

                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new DataCenterException("Retry interrupted", ie);
                    }
                }
            }
        }

        throw new DataCenterException("All retries failed", lastException);
    }

    @Override
    public boolean isHealthy() {
        return delegate.isHealthy();
    }

    @Override
    public String getRegion() {
        return delegate.getRegion();
    }
}

3. 连接池管理

public class DataCenterConnectionPool {
    private final BlockingQueue<DataCenterClient> pool;
    private final int maxSize;
    private final Supplier<DataCenterClient> clientFactory;
    private final AtomicInteger currentSize = new AtomicInteger(0);

    public DataCenterConnectionPool(int maxSize, Supplier<DataCenterClient> clientFactory) {
        this.maxSize = maxSize;
        this.clientFactory = clientFactory;
        this.pool = new LinkedBlockingQueue<>(maxSize);
    }

    public DataCenterClient acquire() throws InterruptedException {
        DataCenterClient client = pool.poll();

        if (client == null && currentSize.get() < maxSize) {
            synchronized (this) {
                if (currentSize.get() < maxSize) {
                    client = clientFactory.get();
                    currentSize.incrementAndGet();
                }
            }
        }

        if (client == null) {
            client = pool.take(); // 等待可用连接
        }

        return client;
    }

    public void release(DataCenterClient client) {
        if (client.isHealthy()) {
            pool.offer(client);
        } else {
            currentSize.decrementAndGet();
        }
    }

    public <T> T executeWithClient(Function<DataCenterClient, T> action) {
        DataCenterClient client = null;
        try {
            client = acquire();
            return action.apply(client);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DataCenterException("Connection acquisition interrupted", e);
        } finally {
            if (client != null) {
                release(client);
            }
        }
    }
}

4. 异步执行与CompletableFuture

public class AsyncDataCenterClient {
    private final ExecutorService executor;
    private final List<DataCenterClient> clients;

    public AsyncDataCenterClient(List<DataCenterClient> clients) {
        this.clients = clients;
        this.executor = Executors.newFixedThreadPool(clients.size());
    }

    // 并行查询所有数据中心,返回最快的结果
    public CompletableFuture<Response> queryFastest(String query) {
        List<CompletableFuture<Response>> futures = clients.stream()
            .map(client -> CompletableFuture.supplyAsync(
                () -> client.executeQuery(query), executor))
            .collect(Collectors.toList());

        return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(result -> (Response) result);
    }

    // 并行查询所有数据中心,聚合结果
    public CompletableFuture<List<Response>> queryAll(String query) {
        List<CompletableFuture<Response>> futures = clients.stream()
            .map(client -> CompletableFuture.supplyAsync(
                () -> client.executeQuery(query), executor)
                .exceptionally(ex -> {
                    System.err.println("Query failed for " + client.getRegion());
                    return null;
                }))
            .collect(Collectors.toList());

        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .filter(Objects::nonNull)
                .collect(Collectors.toList()));
    }

    public void shutdown() {
        executor.shutdown();
    }
}

5. 限流器 (Rate Limiter)

public class RateLimiter {
    private final int maxRequests;
    private final long windowMs;
    private final Queue<Long> requestTimestamps;

    public RateLimiter(int maxRequests, long windowMs) {
        this.maxRequests = maxRequests;
        this.windowMs = windowMs;
        this.requestTimestamps = new ConcurrentLinkedQueue<>();
    }

    public synchronized boolean allowRequest() {
        long now = System.currentTimeMillis();
        long windowStart = now - windowMs;

        // 移除过期的时间戳
        while (!requestTimestamps.isEmpty() && requestTimestamps.peek() < windowStart) {
            requestTimestamps.poll();
        }

        if (requestTimestamps.size() < maxRequests) {
            requestTimestamps.offer(now);
            return true;
        }

        return false;
    }
}

public class RateLimitedDataCenterClient implements DataCenterClient {
    private final DataCenterClient delegate;
    private final RateLimiter rateLimiter;

    public RateLimitedDataCenterClient(DataCenterClient delegate, int maxRequests, long windowMs) {
        this.delegate = delegate;
        this.rateLimiter = new RateLimiter(maxRequests, windowMs);
    }

    @Override
    public Response executeQuery(String query) {
        if (!rateLimiter.allowRequest()) {
            throw new RateLimitExceededException("Rate limit exceeded");
        }
        return delegate.executeQuery(query);
    }

    @Override
    public boolean isHealthy() {
        return delegate.isHealthy();
    }

    @Override
    public String getRegion() {
        return delegate.getRegion();
    }
}

完整代码示例

主要实体类

// 数据中心实体
public class DataCenter {
    private final String id;
    private final String region;
    private final String endpoint;
    private volatile int activeConnections;
    private volatile boolean healthy;

    public DataCenter(String id, String region, String endpoint) {
        this.id = id;
        this.region = region;
        this.endpoint = endpoint;
        this.activeConnections = 0;
        this.healthy = true;
    }

    // Getters and setters
    public String getId() { return id; }
    public String getRegion() { return region; }
    public String getEndpoint() { return endpoint; }
    public int getActiveConnections() { return activeConnections; }
    public boolean isHealthy() { return healthy; }
    public void setHealthy(boolean healthy) { this.healthy = healthy; }
    public void incrementConnections() { activeConnections++; }
    public void decrementConnections() { activeConnections--; }
}

// 请求对象
public class Request {
    private final String query;
    private final String userId;
    private final Map<String, String> headers;

    public Request(String query, String userId) {
        this.query = query;
        this.userId = userId;
        this.headers = new HashMap<>();
    }

    public String getQuery() { return query; }
    public String getUserId() { return userId; }
    public Map<String, String> getHeaders() { return headers; }
}

// 响应对象
public class Response {
    private final int statusCode;
    private final String body;
    private final long timestamp;

    public Response(int statusCode, String body) {
        this.statusCode = statusCode;
        this.body = body;
        this.timestamp = System.currentTimeMillis();
    }

    public int getStatusCode() { return statusCode; }
    public String getBody() { return body; }
    public long getTimestamp() { return timestamp; }
    public boolean isSuccess() { return statusCode >= 200 && statusCode < 300; }
}

// 自定义异常
public class DataCenterException extends RuntimeException {
    public DataCenterException(String message) {
        super(message);
    }

    public DataCenterException(String message, Throwable cause) {
        super(message, cause);
    }
}

public class CircuitBreakerOpenException extends DataCenterException {
    public CircuitBreakerOpenException(String message) {
        super(message);
    }
}

public class RateLimitExceededException extends DataCenterException {
    public RateLimitExceededException(String message) {
        super(message);
    }
}

统一的数据中心管理器

public class MultiDataCenterManager {
    private final List<DataCenter> dataCenters;
    private final DataCenterStrategy strategy;
    private final Map<String, DataCenterClient> clientCache;
    private final DataCenterMonitor monitor;
    private final ScheduledExecutorService healthCheckScheduler;

    public MultiDataCenterManager(List<DataCenter> dataCenters, DataCenterStrategy strategy) {
        this.dataCenters = dataCenters;
        this.strategy = strategy;
        this.clientCache = new ConcurrentHashMap<>();
        this.monitor = new DataCenterMonitor();
        this.healthCheckScheduler = Executors.newScheduledThreadPool(1);

        initializeClients();
        startHealthChecks();
    }

    private void initializeClients() {
        for (DataCenter dc : dataCenters) {
            DataCenterClient baseClient = DataCenterClientFactory.createClient(
                dc.getRegion(), dc.getEndpoint());

            // 添加装饰器:重试 -> 断路器 -> 限流 -> 代理(缓存)
            DataCenterClient client = new DataCenterClientProxy(
                new RateLimitedDataCenterClient(
                    new ResilientDataCenterClient(
                        new RetryableDataCenterClient(baseClient, 3)
                    ), 100, 1000)
            );

            clientCache.put(dc.getId(), client);
        }
    }

    private void startHealthChecks() {
        healthCheckScheduler.scheduleAtFixedRate(() -> {
            for (DataCenter dc : dataCenters) {
                DataCenterClient client = clientCache.get(dc.getId());
                boolean healthy = client.isHealthy();
                dc.setHealthy(healthy);
                monitor.updateHealthStatus(dc.getRegion(), healthy);
            }
        }, 0, 10, TimeUnit.SECONDS);
    }

    public Response executeQuery(Request request) {
        // 1. 选择数据中心
        List<DataCenter> healthyDCs = dataCenters.stream()
            .filter(DataCenter::isHealthy)
            .collect(Collectors.toList());

        if (healthyDCs.isEmpty()) {
            throw new DataCenterException("No healthy data centers available");
        }

        String selectedDCId = strategy.selectDataCenter(healthyDCs, request.getUserId());

        // 2. 获取客户端
        DataCenterClient client = clientCache.get(selectedDCId);
        if (client == null) {
            throw new DataCenterException("Client not found for DC: " + selectedDCId);
        }

        // 3. 执行查询
        try {
            return client.executeQuery(request.getQuery());
        } catch (Exception e) {
            // 4. 故障转移
            return executeWithFailover(request, selectedDCId);
        }
    }

    private Response executeWithFailover(Request request, String failedDCId) {
        for (DataCenter dc : dataCenters) {
            if (!dc.getId().equals(failedDCId) && dc.isHealthy()) {
                DataCenterClient client = clientCache.get(dc.getId());
                try {
                    System.out.println("Failing over to: " + dc.getRegion());
                    return client.executeQuery(request.getQuery());
                } catch (Exception e) {
                    System.err.println("Failover to " + dc.getRegion() + " also failed");
                }
            }
        }
        throw new DataCenterException("All failover attempts failed");
    }

    public void addObserver(DataCenterObserver observer) {
        monitor.addObserver(observer);
    }

    public void shutdown() {
        healthCheckScheduler.shutdown();
    }
}

使用示例

public class Application {
    public static void main(String[] args) {
        // 1. 创建数据中心配置
        List<DataCenter> dataCenters = Arrays.asList(
            new DataCenter("dc-beijing", "BEIJING", "http://beijing.example.com"),
            new DataCenter("dc-shanghai", "SHANGHAI", "http://shanghai.example.com"),
            new DataCenter("dc-guangzhou", "GUANGZHOU", "http://guangzhou.example.com")
        );

        // 2. 选择策略(可以根据配置动态切换)
        DataCenterStrategy strategy = new LeastConnectionStrategy();
        // DataCenterStrategy strategy = new NearestDataCenterStrategy();
        // DataCenterStrategy strategy = new RoundRobinStrategy();

        // 3. 创建管理器
        MultiDataCenterManager manager = new MultiDataCenterManager(dataCenters, strategy);

        // 4. 添加监控观察者
        manager.addObserver(new AlertingObserver());

        // 5. 执行查询
        try {
            Request request = new Request("SELECT * FROM users", "user123");
            Response response = manager.executeQuery(request);

            System.out.println("Status: " + response.getStatusCode());
            System.out.println("Body: " + response.getBody());
        } catch (DataCenterException e) {
            System.err.println("Query failed: " + e.getMessage());
        }

        // 6. 异步查询示例
        executeAsyncQueries(dataCenters);

        // 7. 清理资源
        manager.shutdown();
    }

    private static void executeAsyncQueries(List<DataCenter> dataCenters) {
        List<DataCenterClient> clients = dataCenters.stream()
            .map(dc -> DataCenterClientFactory.createClient(dc.getRegion(), dc.getEndpoint()))
            .collect(Collectors.toList());

        AsyncDataCenterClient asyncClient = new AsyncDataCenterClient(clients);

        // 获取最快的响应
        CompletableFuture<Response> fastestResponse = asyncClient.queryFastest("SELECT COUNT(*)");
        fastestResponse.thenAccept(response -> {
            System.out.println("Fastest response: " + response.getBody());
        });

        // 获取所有响应
        CompletableFuture<List<Response>> allResponses = asyncClient.queryAll("SELECT * FROM products");
        allResponses.thenAccept(responses -> {
            System.out.println("Received " + responses.size() + " responses");
        });

        asyncClient.shutdown();
    }
}

最佳实践

1. 配置管理

public class DataCenterConfig {
    private String region;
    private String endpoint;
    private int maxConnections;
    private int timeout;
    private boolean enableCache;

    // 从配置文件或配置中心加载
    public static List<DataCenterConfig> loadFromConfig() {
        // 实际应该从Apollo、Nacos等配置中心加载
        return Arrays.asList(
            new DataCenterConfig("BEIJING", "http://beijing.example.com", 100, 5000, true),
            new DataCenterConfig("SHANGHAI", "http://shanghai.example.com", 100, 5000, true)
        );
    }
}

2. 监控指标

public class MetricsCollector {
    private final Map<String, AtomicLong> queryCount = new ConcurrentHashMap<>();
    private final Map<String, AtomicLong> errorCount = new ConcurrentHashMap<>();
    private final Map<String, LongAdder> totalLatency = new ConcurrentHashMap<>();

    public void recordQuery(String region, long latency, boolean success) {
        queryCount.computeIfAbsent(region, k -> new AtomicLong()).incrementAndGet();
        totalLatency.computeIfAbsent(region, k -> new LongAdder()).add(latency);

        if (!success) {
            errorCount.computeIfAbsent(region, k -> new AtomicLong()).incrementAndGet();
        }
    }

    public void recordCacheHit() {
        // 记录缓存命中
    }

    public Map<String, Double> getAverageLatency() {
        Map<String, Double> result = new HashMap<>();
        for (String region : queryCount.keySet()) {
            long count = queryCount.get(region).get();
            long totalLat = totalLatency.get(region).sum();
            result.put(region, count > 0 ? (double) totalLat / count : 0.0);
        }
        return result;
    }
}

3. 日志记录

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggingDataCenterClient implements DataCenterClient {
    private static final Logger logger = LoggerFactory.getLogger(LoggingDataCenterClient.class);
    private final DataCenterClient delegate;

    @Override
    public Response executeQuery(String query) {
        logger.info("Executing query on {}: {}", delegate.getRegion(), query);
        long start = System.currentTimeMillis();

        try {
            Response response = delegate.executeQuery(query);
            long duration = System.currentTimeMillis() - start;
            logger.info("Query completed in {}ms, status: {}", duration, response.getStatusCode());
            return response;
        } catch (Exception e) {
            long duration = System.currentTimeMillis() - start;
            logger.error("Query failed after {}ms: {}", duration, e.getMessage(), e);
            throw e;
        }
    }
}

4. 线程安全注意事项

  • 使用 <code>ConcurrentHashMap</code> 而非 <code>HashMap</code>
  • 使用 <code>AtomicInteger</code>、<code>AtomicLong</code> 处理计数器
  • 使用 <code>CopyOnWriteArrayList</code> 处理观察者列表
  • 使用 <code>synchronized</code> 或 <code>Lock</code> 保护临界区

5. 性能优化建议

  1. 连接复用:使用连接池避免频繁创建连接
  2. 批量操作:合并多个小请求为一个大请求
  3. 异步执行:使用 CompletableFuture 并行处理
  4. 结果缓存:缓存热点数据减少远程调用
  5. 超时设置:合理设置连接和读取超时时间

6. 容错与降级

public class FallbackHandler {
    public Response handleFailure(Request request, Exception e) {
        // 1. 返回缓存数据
        Response cached = getCachedResponse(request);
        if (cached != null) {
            return cached;
        }

        // 2. 返回默认值
        return new Response(503, "{\"error\": \"Service temporarily unavailable\"}");
    }

    private Response getCachedResponse(Request request) {
        // 从缓存获取
        return null;
    }
}

总结

核心要点

  1. 设计模式:策略、工厂、代理、责任链、观察者
  2. 高级技巧:断路器、重试、限流、异步、连接池
  3. 关键考虑:故障转移、负载均衡、监控告警、性能优化

学习路径

  1. 理解每个设计模式的用途
  2. 实现简单的单数据中心访问
  3. 添加故障转移和负载均衡
  4. 引入高级特性(断路器、限流等)
  5. 完善监控和日志系统
  6. 进行压力测试和优化

推荐阅读

  • 《设计模式:可复用面向对象软件的基础》
  • 《Java并发编程实战》
  • 《微服务架构设计模式》
  • Resilience4j 框架文档
  • Netflix Hystrix 文档
赞(0)
未经允许不得转载:Toy Tech Blog » 跨数据中心?代码该怎么思考?看这篇就够了

评论 抢沙发

十年稳如初 — LocVPS,用时间证明实力

10+ 年老牌云主机服务商,全球机房覆盖,性能稳定、价格厚道。

老品牌,更懂稳定的价值你的第一台云服务器,从 LocVPS 开始