文章

一个基于 Netty + ZooKeeper 的轻量级 RPC 框架简单实现

📌 项目概述

本项目是一个基于 Netty 网络通信和 ZooKeeper 服务治理的分布式 RPC 框架,包含完整的 服务注册发现、负载均衡、容错处理、链路追踪 等核心功能。通过模块化设计,实现了高性能、低延迟的远程调用能力,适用于分布式系统中的服务间通信场景


🏗️ 技术架构图

框架采用分层架构设计:

  • 通信层 :基于 Netty 实现高性能 TCP 通信
  • 服务治理层 :集成 ZooKeeper 实现服务注册发现、一致性哈希负载均衡、熔断限流等
  • 应用层 :提供动态代理接口调用和链路追踪能力

🔧 核心功能实现

1. 通信层(Netty)

客户端通信

package cn.phixlin.core.client.rpcclient.impl;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.discovery.ServiceDiscover;
import cn.phixlin.core.client.discovery.ZKServiceDiscover;
import cn.phixlin.core.client.netty.MDCChannelHandler;
import cn.phixlin.core.client.netty.NettyClientInitializer;
import cn.phixlin.core.client.rpcclient.RpcClient;
import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import cn.phixlin.rpc.common.serializer.serializer.Serializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.net.InetSocketAddress;
import java.util.Map;

import static cn.phixlin.rpc.common.netty.NettyConstant.RPC_RESPONSE;

public class NettyRpcClient implements RpcClient {

    private static final Bootstrap bootstrap;
    private static final EventLoopGroup eventLoopGroup;

    private final InetSocketAddress inetSocketAddress;

    public NettyRpcClient(InetSocketAddress inetSocketAddress) {
        this.inetSocketAddress = inetSocketAddress;
    }

    static {
        bootstrap = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup();

        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                // NettyClientInitializer 这里 配置 netty 对消息的处理机制
                .handler(new NettyClientInitializer());}

    @Override
    public RpcResponse sendRequest(RpcRequest request) {
        // 从注册中心获取 host,port
        if (inetSocketAddress == null) return RpcResponse.fail("未匹配到服务");
        try {
            // 创建一个 channelFuture 对象,代表这一个操作事件,sync 方法表示堵塞直到 connect 完成
            ChannelFuture channelFuture = bootstrap.connect(inetSocketAddress).sync();
            // channel 表示一个连接的单位,类似 socket
            Channel channel = channelFuture.channel();

            // 将当前 TraceContext 保存在 channel 属性中
            channel.attr(MDCChannelHandler.TRACE_CONTEXT_ATTR_KEY).set(TraceContext.getCopy());

            // 发送数据
            channel.writeAndFlush(request);
            // sync() 堵塞获取结果
            channel.closeFuture().sync();
            // 阻塞的获得结果,通过给 channel 设计别名,获取特定名字下的 channel 中的内容(这个在 hanlder 中设置)
            // AttributeKey 是,线程隔离的,不会由线程安全问题。
            // 当前场景下选择堵塞获取结果
            // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
            AttributeKey<RpcResponse> key = AttributeKey.valueOf(RPC_RESPONSE);
            RpcResponse response = channel.attr(key).get();
            if (response == null) {
                System.err.println(StrUtil.format("[客户端] 服务端响应为空, rpcRequest {}", request));
                return RpcResponse.fail("客户端判断服务端响应为空");}

            System.out.println(StrUtil.format("[客户端] 收到响应:{}", response.toString()));
            return response;
        } catch (InterruptedException e) {
            System.err.println(StrUtil.format("[客户端] 请求被中断:{}", e.getMessage()));
            Thread.currentThread().interrupt();
        }
        return RpcResponse.fail("客户端请求失败");}

    @Override
    public void close() {
        try {eventLoopGroup.shutdownGracefully();
        } catch (Exception e) {
            System.out.println(StrUtil.format("[客户端] 关闭 Netty 资源时发生异常: {}", e.getMessage()));
            Thread.currentThread().interrupt();
        }
    }

}


package cn.phixlin.core.client.netty;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.rpc.common.serializer.coder.CusDecoder;
import cn.phixlin.rpc.common.serializer.coder.CusEncoder;
import cn.phixlin.rpc.common.serializer.serializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();

        try {
            // todo 做成配置项
            pipeline.addLast(new CusEncoder(Serializer.getSerializerByCode(3)));
            pipeline.addLast(new CusDecoder());
            pipeline.addLast(new NettyClientHandler());
            pipeline.addLast(new MDCChannelHandler());
            // 客户端只关心写事件,如果 8 秒没有发送数据,则发送心跳包
            // 当通道有一段时间没有执行读取、写入或同时执行这两种操作时,触发 IdleStateEvent。
            pipeline.addLast(new IdleStateHandler(0, 8, 0, TimeUnit.SECONDS));
            pipeline.addLast(new HeartbeatHandler());} catch (Exception e) {
            System.out.println(StrUtil.format("[netty] 初始化失败 {}", e.getMessage()));
            throw e;
        }
    }
}

package cn.phixlin.core.client.netty;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.rpc.common.message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
        // 接收到 response, 给 channel 设计别名,让 sendRequest 里读取 response
        AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
        ctx.channel().attr(key).set(response);
        // ctx.channel().close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 异常处理
        System.out.println(StrUtil.format("[客户端] 请求处理异常: {}", cause.getMessage()));
        ctx.close();}
}

服务端通信

package cn.phixlin.core.server.rpcserver.impl;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.server.netty.NettyServerInitializer;
import cn.phixlin.core.server.register.ServiceProvider;
import cn.phixlin.core.server.rpcserver.RpcServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyRpcServer implements RpcServer {

    private final ServiceProvider serviceProvider;

    private ChannelFuture channelFuture;

    public NettyRpcServer(ServiceProvider serviceProvider) {
        this.serviceProvider = serviceProvider;
    }

    @Override
    public void start(int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        System.out.println("[服务端] netty 启动");

        try {
            // 启动 netty 服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 初始化
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    // NettyClientInitializer 这里 配置 netty 对消息的处理机制
                    .childHandler(new NettyServerInitializer(serviceProvider));
            // 同步堵塞
            channelFuture = bootstrap.bind(port).sync();
            System.out.println(StrUtil.format("[服务端] netty 绑定端口:{}", port));
            // 死循环监听
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {Thread.currentThread().interrupt();
            System.out.println(StrUtil.format("[服务端] 服务中断:{}", e.getMessage()));} finally {shutdown(bossGroup, workGroup);
            System.out.println(StrUtil.format("[服务端] netty 服务关闭"));}
    }

    @Override
    public void stop() {
        if (channelFuture != null) {
            try {channelFuture.channel().close().sync();
            } catch (InterruptedException e) {Thread.currentThread().interrupt();
                System.out.println(StrUtil.format("[服务端] 关闭 netty 服务端主通道时中断"));}
        } else {
            System.out.println(StrUtil.format("[服务端] netty 服务端主通道尚未启动,无法关闭"));}
    }

    private void shutdown(NioEventLoopGroup bossGroup, NioEventLoopGroup workGroup) {
        if (bossGroup != null){bossGroup.shutdownGracefully().syncUninterruptibly();}
        if (workGroup != null) {workGroup.shutdownGracefully().syncUninterruptibly();}
    }
}

package cn.phixlin.core.server.netty;

import cn.phixlin.core.client.netty.HeartbeatHandler;
import cn.phixlin.core.server.register.ServiceProvider;
import cn.phixlin.rpc.common.serializer.coder.CusDecoder;
import cn.phixlin.rpc.common.serializer.coder.CusEncoder;
import cn.phixlin.rpc.common.serializer.serializer.JsonSerializer;
import cn.phixlin.rpc.common.serializer.serializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    private final ServiceProvider serviceProvider;

    public NettyServerInitializer(ServiceProvider serviceProvider) {
        this.serviceProvider = serviceProvider;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 服务端关注读事件和写事件,如果 10 秒内没有收到客户端消息,将会触发 IdleState.reader_idle 事件,将由 HeartbeatHandler 进行处理
        pipeline.addLast(new IdleStateHandler(10,20,0,TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
        // 使用自定义的编 / 解码器
        pipeline.addLast(new CusEncoder(Serializer.getSerializerByCode(3)));
        pipeline.addLast(new CusDecoder());
        pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
    }


}
package cn.phixlin.core.server.netty;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.server.ratelimit.RateLimit;
import cn.phixlin.core.server.register.ServiceProvider;
import cn.phixlin.core.trace.interceptor.ServiceTraceInterceptor;
import cn.phixlin.rpc.common.message.RequestType;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;


import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
    private ServiceProvider serviceProvider;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {

        if(request == null){
            System.out.println(StrUtil.format("[服务端] 接受到非法请求,RpcRequest 为空"));
            return;
        }

        if(request.getType() == RequestType.HEARTBEAT.getCode()){
            System.out.println(StrUtil.format("[服务端] 接受到来自客户端的心跳"));
            return;
        }

        // 日志链路记录
        ServiceTraceInterceptor.beforeInvoke();

        // 接收 request,读取并调用服务
        RpcResponse response = getResponse(request);

        // 日志链路上报
        ServiceTraceInterceptor.afterInvoke(request.getInterfaceName());

        ctx.writeAndFlush(response);

        // 使用长连接,不进行资源回收
        // ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        System.out.println(StrUtil.format("[服务端] 处理请求时发生异常:{}", e.getMessage()));
        ctx.close();}

    private RpcResponse getResponse(RpcRequest rpcRequest) {
        // 得到服务名
        String interfaceName = rpcRequest.getInterfaceName();

        RateLimit rateLimit = serviceProvider.getRateLimitProvider().getRateLimit(interfaceName);
        if (!rateLimit.getToken()) {
            // 如果获取令牌失败,限流降级
            System.out.println("[服务端] 限流降级");
            return RpcResponse.fail("限流降级");}

        // 得到服务端相应服务实现类
        Object service = serviceProvider.getService(interfaceName);
        try {
            // 反射调用方法
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
            Object invoke = method.invoke(service, rpcRequest.getParams());
            return RpcResponse.suc(invoke);
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            System.out.println("[服务端] 方法执行错误");
            return RpcResponse.fail(e.getMessage());
        }
    }
}

2 服务治理

服务发现(ZooKeeper 实现)

package cn.phixlin.core.client.discovery;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.discovery.balance.ConsistencyHashBalance;
import cn.phixlin.core.client.discovery.balance.LoadBalance;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import static cn.phixlin.core.constants.ZKConstants.*;

public class ZKServiceDiscover implements ServiceDiscover {

    private final CuratorFramework curatorFramework;

    private final ServiceDiscoverCache serviceDiscoverCache;

    private final LoadBalance loadBalance = new ConsistencyHashBalance();

    public ZKServiceDiscover() {

        this.curatorFramework = CuratorFrameworkFactory.builder().connectString(ZK_CONNECT_STRING)
                .sessionTimeoutMs(40000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace(ZK_ROOT_PATH)
                .build();
        this.curatorFramework.start();
        System.out.println("[zookeeper] 客户端发现启动");

        // 初始化本地缓存
        serviceDiscoverCache = new ServiceDiscoverCache();
        // 监听 zookeeper 事件,自动更新缓存
        ZKListener zkListener = new ZKListener(curatorFramework, serviceDiscoverCache);
        zkListener.zkListener2Cache(ZK_ROOT_PATH);

    }

    @Override
    public InetSocketAddress serviceDiscovery(String serviceName) {

        try {
            // 1. 从客户端缓存中取
            List<String> serviceAddressList = serviceDiscoverCache.getServiceFromCache(serviceName);
            if (serviceAddressList == null || serviceAddressList.isEmpty()) {
                // 2. 客户端缓存找不到就去 zk 找
                serviceAddressList = curatorFramework.getChildren().forPath("/" + serviceName);
                if (serviceAddressList == null) {
                    System.out.println(StrUtil.format("[zookeeper] 没有发现 {}", serviceAddressList));
                    return null;
                }
                List<String> serviceFromCache = serviceDiscoverCache.getServiceFromCache(serviceName);
                if (serviceFromCache == null || serviceFromCache.isEmpty()){serviceAddressList.forEach(address -> serviceDiscoverCache.addService2Cache(serviceName, address));
                }
            }
            System.out.println(StrUtil.format("[zookeeper] 发现服务 {}", serviceAddressList));
            // 负载均衡获取地址
            String addressStr = loadBalance.balance(serviceAddressList);

            System.out.println(StrUtil.format("[zookeeper] 发现 {}", serviceAddressList));
            return this.parseAddress(addressStr);
        } catch (Exception e) {
            System.err.println(StrUtil.format("[zookeeper] serviceName = {} 服务发现失败 {}", serviceName, e.getMessage()));}
        return null;

    }

    private final Set<String> retryServiceCache = new CopyOnWriteArraySet<>();

    @Override
    public boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature) {

        if (retryServiceCache.isEmpty()) {
            try {
                CuratorFramework rootClient = curatorFramework.usingNamespace(ZK_RETRY);
                List<String> retryMethods = rootClient.getChildren().forPath("/" + getServiceAddress(serviceAddress));
                retryServiceCache.addAll(retryMethods);
            } catch (Exception e) {
                System.err.println(StrUtil.format("[zookeeper] 检查重试失败,方法签名:{}, {}", methodSignature, e.getMessage()));}
        }
        return retryServiceCache.contains(methodSignature);
    }

    @Override
    public void close() {curatorFramework.close();
    }

    /**
     * 将 InetSocketAddress 解析为格式为 ip:port 的字符串
     *
     * @param serviceAddress 地址
     * @return ip:port
     */
    private String getServiceAddress(InetSocketAddress serviceAddress) {
        return serviceAddress.getHostName() + ":" + serviceAddress.getPort();}

    private InetSocketAddress parseAddress(String address) {String[] result = address.split(":");
        return new InetSocketAddress(result[0], Integer.parseInt(result[1]));}
}

负载均衡(一致性哈希)

package cn.phixlin.core.client.discovery.balance;

import cn.hutool.core.util.StrUtil;

import java.util.*;

public class ConsistencyHashBalance implements LoadBalance {

    // 虚拟节点的个数
    private static final int VIRTUAL_NUM = 5;

    // 虚拟节点分配,key 是 hash 值,value 是虚拟节点服务器名称
    private final SortedMap<Integer, String> hashRing = new TreeMap<>();

    // 真实节点列表
    private final List<String> realNodes = new LinkedList<>();

    @Override
    public String balance(List<String> nodes) {
        String uuid = UUID.randomUUID().toString();
        return getNode(uuid, nodes);
    }

    @Override
    public void addNode(String node) {hashRing.clear();
        if (!realNodes.contains(node)) {realNodes.add(node);
            System.out.println("[负载均衡] 真实节点 {} 上线添加");
            for (int i = 0; i < VIRTUAL_NUM; i++) {
                String virtualNode = node + "&&VN" + i;
                int hash = getHash(virtualNode);
                hashRing.put(hash, virtualNode);
                System.out.println(StrUtil.format("[负载均衡] 虚拟节点:{}, hash:{} 被添加"));}
        }
    }

    @Override
    public void removeNode(String node) {
        if (realNodes.contains(node)) {realNodes.remove(node);
            System.out.println(StrUtil.format("[负载均衡] 真实节点 {} 下线移除", node));
            for (int i = 0; i < VIRTUAL_NUM; i++) {
                String virtualNode = node + "&&VN" + i;
                int hash = getHash(virtualNode);
                hashRing.remove(hash);
                System.out.println(StrUtil.format("[负载均衡] 虚拟节点:{}, hash:{} 被下线移除", node));}
        }
    }

    /**
     * 获取被分配的节点名
     */
    private String getNode(String node, List<String> nodes) {
        this.init(nodes);
        return getNode(node);
    }

    private String getNode(String node) {
        int hash = getHash(node);
        Integer key;
        SortedMap<Integer, String> subMap = hashRing.tailMap(hash);
        if (subMap.isEmpty()) {key = hashRing.lastKey();
        } else {key = subMap.firstKey();
        }
        String virtualNode = hashRing.get(key);
        return virtualNode.substring(0, virtualNode.indexOf("&&"));}

    private void init(List<String> nodes) {
        // hashRing.clear();
        for (String node : nodes) {realNodes.add(node);
            System.out.println(StrUtil.format("[负载均衡] 真实节点 {} 被添加", node));
            for (int i = 0; i < VIRTUAL_NUM; i++) {
                String virtualNode = node + "&&VN" + i;
                int hash = getHash(virtualNode);
                hashRing.put(hash, virtualNode);
                System.out.println(StrUtil.format("[负载均衡] 虚拟节点:{}, hash:{} 被添加", virtualNode, hash));}
        }
    }

    /**
     * FNV1_32_HASH 算法
     */
    private static int getHash(String str) {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (int i = 0; i < str.length(); i++)
            hash = (hash ^ str.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        // 如果算出来的值为负数则取其绝对值
        if (hash < 0)hash = Math.abs(hash);
        return hash;
    }
}

熔断机制(Circuit Breaker)

package cn.phixlin.core.client.circuitbreaker;

import cn.hutool.core.util.StrUtil;

import java.util.HashMap;
import java.util.Map;

public class CircuitBreakerProvider {

    Map<String, CircuitBreaker> circuitBreakerMap = new HashMap<>();

    public synchronized CircuitBreaker getCircuitBreaker(String serviceName){
        CircuitBreaker circuitBreaker;

        if(circuitBreakerMap.containsKey(serviceName)){circuitBreaker = circuitBreakerMap.get(serviceName);
        }else {
            System.out.println(StrUtil.format("[熔断器] serviceName = {} 创建一个新的熔断器", serviceName));
            circuitBreaker = new CircuitBreaker(1,0.5,10);
            circuitBreakerMap.put(serviceName,circuitBreaker);
        }
        return circuitBreaker;
    }
}

package cn.phixlin.core.client.circuitbreaker;

import cn.hutool.core.util.StrUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class CircuitBreaker {
    // 当前状态
    private CircuitBreakerState state = CircuitBreakerState.CLOSED;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicInteger successCount = new AtomicInteger(0);
    private final AtomicInteger requestCount = new AtomicInteger(0);

    // 失败次数阈值
    private final int failureThreshold;
    // 半开启 -》关闭状态的成功次数比例
    private final double halfOpenSuccessRate;
    // 恢复时间
    private final long retryTimePeriod;
    // 上一次失败时间
    private long lastFailureTime = 0;

    public CircuitBreaker(int failureThreshold, double halfOpenSuccessRate, long retryTimePeriod) {
        this.failureThreshold = failureThreshold;
        this.halfOpenSuccessRate = halfOpenSuccessRate;
        this.retryTimePeriod = retryTimePeriod;
    }

    /**
     * 查看当前熔断器是否允许请求通过
     */
    public synchronized boolean allowRequest() {
        long cul = System.currentTimeMillis();
        System.out.println(StrUtil.format("[熔断器] 熔断前失败: {} 次", failureCount));
        switch (state) {
            case OPEN -> {
                if (cul - lastFailureTime > retryTimePeriod) {
                    state = CircuitBreakerState.HALF_OPEN;
                    // 重置统计
                    resetCounts();
                    return true;
                }
                System.out.println("[熔断器] 熔断生效中");
                return false;
            }
            case HALF_OPEN -> {requestCount.incrementAndGet();
                return true;
            }
            default -> {
                return true;
            }
        }
    }

    /**
     * 记录失败
     */
    public synchronized void recordFailure() {failureCount.incrementAndGet();
        System.out.println(StrUtil.format("[熔断器] 记录失败次数: {} 次", failureCount));
        lastFailureTime = System.currentTimeMillis();
        if (state == CircuitBreakerState.HALF_OPEN) {
            state = CircuitBreakerState.OPEN;
            lastFailureTime = System.currentTimeMillis();} else if (failureCount.get() >= failureThreshold){state = CircuitBreakerState.OPEN;}
    }

    /**
     * 记录成功
     */
    public synchronized void recordSuccess() {
        if (state == CircuitBreakerState.HALF_OPEN) {successCount.incrementAndGet();
            if (successCount.get() > halfOpenSuccessRate * requestCount.get()) {
                state = CircuitBreakerState.CLOSED;
                resetCounts();}
        } else {resetCounts();
        }
    }

    /**
     * 重置统计次数
     */
    public void resetCounts() {
        failureCount.set(0);
        successCount.set(0);
        requestCount.set(0);}
}

重试机制(Guava Retry)

package cn.phixlin.core.client.retry;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.rpcclient.RpcClient;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import com.github.rholder.retry.*;

import java.util.concurrent.TimeUnit;

public class GuavaRetry {

    public RpcResponse sendServiceWithRetry(RpcRequest rpcRequest, RpcClient rpcClient) {Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
                // 无论出现什么异常,都进行重试
                .retryIfException()
                // 返回结果为 error 时进行重试
                .retryIfResult(resp -> ObjectUtil.equals(resp.getCode(), RpcResponse.FAILCODE))
                // 重试等待策略:等待 2s 后再进行重试
                .withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS))
                // 重试停止策略:重试达到 3 次
                .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                // 日志
                .withRetryListener(new RetryListener() {
                    @Override
                    public <V> void onRetry(Attempt<V> attempt) {
                        System.out.println(StrUtil.format("[retry] RetryListener 第 {} 次调用", attempt.getAttemptNumber()));}
                }).build();

        try {
            return retryer.call(()-> rpcClient.sendRequest(rpcRequest));
        } catch (Exception e) {e.printStackTrace();
        }
        return RpcResponse.fail("重试后依旧失败");}
}

3 服务注册(ZooKeeper)

package cn.phixlin.core.server.register;

import cn.hutool.core.util.StrUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static cn.phixlin.core.constants.ZKConstants.*;

public class ZKServiceRegister implements ServiceRegister {

    private final CuratorFramework curatorFramework;

    public ZKServiceRegister() {

        // zookeeper 的地址固定,不管是服务提供者还是,消费者都要与之建立连接
        // sessionTimeoutMs 与 zoo.cfg 中的 tickTime 有关系,
        // zk 还会根据 minSessionTimeout 与 maxSessionTimeout 两个参数重新调整最后的超时值。默认分别为 tickTime 的 2 倍和 20 倍
        // 使用心跳监听状态
        curatorFramework = CuratorFrameworkFactory.builder().connectString(ZK_CONNECT_STRING)          // ZooKeeper 地址
                .sessionTimeoutMs(40000)                   // 会话超时时间
                .connectionTimeoutMs(5000)                // 连接超时时间
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))  // 指数退避重试策略
                .namespace(ZK_ROOT_PATH)                 // 命名空间隔离
                .build();
        curatorFramework.start();  // 启动客户端
        System.out.println("[zookeeper] 开始连接");}

    @Override
    public void register(Class<?> service, InetSocketAddress serviceAddress, boolean canRetry) {
        String serviceName = service.getName();
        try {
            // serviceName 创建成永久节点,服务提供者下线时,不删服务名,只删地址
            if (curatorFramework.checkExists().forPath("/" + serviceName) == null){curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
                System.out.println(StrUtil.format("[zookeeper] 服务节点 {} 创建成功", "/" + serviceName));}

            // 路径地址,一个 / 代表一个节点
            String path = "/" + serviceName + "/" + getServiceAddress(serviceAddress);
            if (curatorFramework.checkExists().forPath(path) == null) {
                // 临时节点,服务器下线就删除节点
                curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
                System.out.println(StrUtil.format("[zookeeper] 服务地址 {}, 注册成功", path));} else {
                System.out.println(StrUtil.format("[zookeeper] 服务地址 {}, 已经存在, 跳过注册", path));}

            // 注册重试白名单
            if (canRetry) {
                CuratorFramework rootClient = curatorFramework.usingNamespace(ZK_RETRY);
                Method[] declaredMethods = service.getDeclaredMethods();
                for (Method method : declaredMethods) {
                    String retryPath = "/" + getServiceAddress(serviceAddress) + "/" + getMethodSignature(service, method);
                    if (rootClient.checkExists().forPath(retryPath) == null){rootClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(retryPath);
                        System.out.println(StrUtil.format("[zookeeper] 白名单服务地址 {}, 注册成功", retryPath));}else {
                        System.out.println(StrUtil.format("[zookeeper] 白名单服务地址 {}, 已经存在, 跳过注册", retryPath));}
                }
            }
        } catch (Exception e) {
            System.out.println(StrUtil.format("[zookeeper] 服务: {}, 注册错误: {}", serviceName, e.getMessage()));}

    }


    private String getMethodSignature(Class<?> clazz, Method method) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(clazz.getName()).append("#").append(method.getName());

        String parameterTypeStr = Arrays.stream(method.getParameterTypes()).map(Class::getName).collect(Collectors.joining(",", "(", ")"));
        stringBuilder.append(parameterTypeStr);

        return stringBuilder.toString();}

    // 地址 -> XXX.XXX.XXX.XXX:port 字符串
    private String getServiceAddress(InetSocketAddress serverAddress) {

        return serverAddress.getHostName() + ":" + serverAddress.getPort();}

}


📈 附加功能

分布式追踪(Zipkin 集成)

package cn.phixlin.core.trace.interceptor;

import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.core.trace.TraceIdGenerator;
import cn.phixlin.core.trace.ZipkinReporter;

public class ServiceTraceInterceptor {

    public static void beforeInvoke() {
        String traceId = TraceContext.getTraceId();
        String parentSpanId = TraceContext.getParentSpanId();
        String spanId = TraceIdGenerator.generateSpanId();

        TraceContext.setTraceId(traceId);
        TraceContext.setSpanId(spanId);
        TraceContext.setParentSpanId(parentSpanId);

        // 记录服务端 Span
        long startTimeStamp = System.currentTimeMillis();
        TraceContext.setStartTimestamp(String.valueOf(startTimeStamp));
    }

    public static void afterInvoke(String serviceName) {
        long endTimestamp = System.currentTimeMillis();
        long startTimestamp = Long.parseLong(TraceContext.getStartTimestamp());
        long duration = endTimestamp - startTimestamp;

        // 上报服务端 Span
        ZipkinReporter.reportSpan(TraceContext.getTraceId(),
                TraceContext.getSpanId(),
                TraceContext.getParentSpanId(),
                "server-" + serviceName,
                startTimestamp,
                duration,
                serviceName,
                "server"
        );

        // 清理 TraceContext
        TraceContext.clear();}
}

package cn.phixlin.core.trace.interceptor;

import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.core.trace.TraceIdGenerator;
import cn.phixlin.core.trace.ZipkinReporter;

public class ClientTraceInterceptor {

    public static void beforeInvoke() {
        // 1. 获取或者生成 traceId
        String traceId = TraceContext.getTraceId();
        if (traceId == null){traceId = TraceIdGenerator.generateTraceId();
        }

        // 2. 生成 spanId
        String spanId = TraceIdGenerator.generateSpanId();

        // 3. 生成时间戳
        String startTimeStamp = String.valueOf(System.currentTimeMillis());

        // 4. 写入 Span 信息到 线程上下文
        TraceContext.setTraceId(traceId);
        TraceContext.setSpanId(spanId);
        TraceContext.setStartTimestamp(startTimeStamp);
    }

    public static void afterInvoke(String serviceName) {
        // 1. 计算 Invoke 耗时
        long endTimeStamp = System.currentTimeMillis();
        long startTimeStamp = Long.parseLong(TraceContext.getStartTimestamp());
        long duration = endTimeStamp - startTimeStamp;

        // 2. 上报客户端 Span
        ZipkinReporter.reportSpan(TraceContext.getTraceId(),
                TraceContext.getSpanId(),
                TraceContext.getParentSpanId(),
                "client-" + serviceName,
                startTimeStamp,
                duration,
                serviceName,
                "client"
        );

        // 3. 清理 线程上下文
        TraceContext.clear();}
}

package cn.phixlin.core.trace;

import cn.hutool.core.util.StrUtil;
import zipkin2.Span;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.okhttp3.OkHttpSender;

public class ZipkinReporter {

    /**
     * Zipkin 服务器地址
     */
    private static final String ZIPKIN_URL = "http://localhost:9512/api/v2/spans";

    private static final AsyncReporter<Span> reporter;

    static {
        OkHttpSender sender = OkHttpSender.create(ZIPKIN_URL);
        reporter = zipkin2.reporter.AsyncReporter.create(sender);
    }

    public static void reportSpan(String traceId,
                                  String spanId,
                                  String parentSpanId,
                                  String name,
                                  long startTimestamp,
                                  long duration,
                                  String serviceName,
                                  String type) {

        Span span = Span.newBuilder().traceId(traceId)
                .id(spanId)
                .parentId(parentSpanId)
                .name(name)
                .timestamp(startTimestamp * 1000)
                .duration(duration * 1000)
                .putTag("service", serviceName)
                .putTag("type", type).build();

        reporter.report(span);
        System.out.println(StrUtil.format("[zipkin] 当前 traceId: {} 上报日志", traceId));}

    public static void close() {reporter.close();
    }
}

流控策略(令牌桶算法)

package cn.phixlin.core.server.ratelimit;

import cn.hutool.core.util.StrUtil;

import java.util.HashMap;
import java.util.Map;

public class RateLimitProvider {

    private final Map<String, RateLimit> rateLimitMap = new HashMap<>();

    public RateLimit getRateLimit(String interfaceName) {
        if (!rateLimitMap.containsKey(interfaceName)) {
            int rate = 100;
            int capacity = 200;
            RateLimit rateLimit = new TokenBucketRateLimit(rate, capacity);
            System.out.println(StrUtil.format("[注册限流器] rate: {}, capacity: {}", rate, capacity));
            rateLimitMap.put(interfaceName, rateLimit);
            return rateLimit;
        }
        return rateLimitMap.get(interfaceName);
    }
}

package cn.phixlin.core.server.ratelimit;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
public class TokenBucketRateLimit implements RateLimit {

    // 上一次令牌发放时间
    public long lastTime = System.currentTimeMillis();
    // 桶的容量
    public long CAPACITY;
    // 令牌生成速度 /s
    public int RATE;
    // 当前令牌数量
    public AtomicLong tokens = new AtomicLong(0);

    // 时间戳
    private volatile long startTimestamp = System.currentTimeMillis();

    public TokenBucketRateLimit(int rate, int capacity) {
        RATE = rate;
        CAPACITY = capacity;
    }

    @Override
    public synchronized boolean getToken() {

        // 计算上次请求时间间隔内生产的令牌数量
        long curTimestamp = System.currentTimeMillis();

        long gapToken = (curTimestamp - startTimestamp) * RATE / 1000;
        long allToken = tokens.get() + gapToken;

        tokens.set(Math.min(CAPACITY, allToken));

        if (tokens.get() < 1) {
            return false;
        } else {
            tokens.getAndAdd(-1);
            startTimestamp = curTimestamp;
            return true;
        }

    }
}

动态代理(接口调用拦截)

package cn.phixlin.core.client.proxy;

import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.circuitbreaker.CircuitBreaker;
import cn.phixlin.core.client.circuitbreaker.CircuitBreakerProvider;
import cn.phixlin.core.client.discovery.ZKServiceDiscover;
import cn.phixlin.core.client.retry.GuavaRetry;
import cn.phixlin.core.client.rpcclient.RpcClient;
import cn.phixlin.core.client.rpcclient.impl.NettyRpcClient;
import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.core.trace.interceptor.ClientTraceInterceptor;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.stream.Collectors;

public class ClientProxy implements InvocationHandler {

    private RpcClient rpcClient;

    private final ZKServiceDiscover zkServiceDiscover;

    private final CircuitBreakerProvider circuitBreakerProvider;

    public ClientProxy() {
        this.zkServiceDiscover = new ZKServiceDiscover();
        this.circuitBreakerProvider = new CircuitBreakerProvider();}


    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {

        // 记录日志链路信息
        ClientTraceInterceptor.beforeInvoke();
        System.out.println(StrUtil.format("[客户端] TraceContext: {}", TraceContext.getTraceId() + ";" + TraceContext.getSpanId()));
        Class<?>[] parameterTypes = method.getParameterTypes();

        // 构建 request
        RpcRequest request = RpcRequest.builder().interfaceName(method.getDeclaringClass().getName()).methodName(method.getName())
                .params(args)
                .paramsType(parameterTypes)
                .build();

        // 获取熔断器
        CircuitBreaker circuitBreaker = circuitBreakerProvider.getCircuitBreaker(request.getMethodName());
        // 判断熔断器是否允许请求经过
        if (!circuitBreaker.allowRequest()) {
            return RpcResponse.fail("熔断");}

        // 数据传输
        RpcResponse response;
        // 后续添加逻辑:为保持幂等性,只对白名单上的服务进行重试
        // 如果启用重试机制,先检查是否需要重试
        String methodSignature = getMethodSignature(request.getInterfaceName(), method);
        System.out.println(StrUtil.format("[客户端] 调用方法签名:{}", methodSignature));
        InetSocketAddress inetSocketAddress = zkServiceDiscover.serviceDiscovery(request.getInterfaceName());
        if(inetSocketAddress == null){
            System.out.println("[客户端] 未发现可用服务");}

        rpcClient = new NettyRpcClient(inetSocketAddress);

        // 后续添加逻辑,为保持幂等性,只对白名单上服务进行重试
        if (zkServiceDiscover.checkRetry(inetSocketAddress, methodSignature)) {
            // 调用 retry 框架进行重试操作
            response = new GuavaRetry().sendServiceWithRetry(request, rpcClient);
        } else {response = rpcClient.sendRequest(request);
        }

        if (response.getCode() == RpcResponse.SUCCODE)circuitBreaker.recordSuccess();
        if (response.getCode() == RpcResponse.FAILCODE)circuitBreaker.recordFailure();

        // 记录日志链路信息
        ClientTraceInterceptor.afterInvoke(request.getInterfaceName());

        return response.getData();}

    /**
     * 获取方法签名
     *
     * @param interfaceName 接口类名
     * @param method        方法
     * @return 方法签名
     */
    private String getMethodSignature(String interfaceName, Method method) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(interfaceName).append("#").append(method.getName());

        Class<?>[] parameterTypes = method.getParameterTypes();
        String parameterTypesStr = Arrays.stream(parameterTypes).map(Class::getName).collect(Collectors.joining(",", "(", ")"));
        return stringBuilder.append(parameterTypesStr).toString();}

    public <T> T getProxy(Class<T> clazz) {
        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
        return (T) o;
    }

    public void close() {rpcClient.close();
        zkServiceDiscover.close();}
}


📊 总结与展望

本框架通过 Netty 的高性能通信 和 ZooKeeper 的服务治理能力 ,实现了完整的 RPC 调用链路。核心优势包括:

  • 基于一致性哈希的负载均衡算法,减少节点变动带来的影响
  • 熔断 + 重试的容错组合,提升系统可用性
  • 令牌桶限流防止服务雪崩,分布式追踪支持全链路监控

后续可优化方向:

  1. 支持 gRPC/HTTP 多协议接入
  2. 增加服务治理控制台
  3. 实现更细粒度的流量调度策略
License:  CC BY 4.0