一个基于 Netty + ZooKeeper 的轻量级 RPC 框架简单实现
📌 项目概述
本项目是一个基于 Netty 网络通信和 ZooKeeper 服务治理的分布式 RPC 框架,包含完整的 服务注册发现、负载均衡、容错处理、链路追踪 等核心功能。通过模块化设计,实现了高性能、低延迟的远程调用能力,适用于分布式系统中的服务间通信场景
🏗️ 技术架构图
框架采用分层架构设计:
- 通信层 :基于 Netty 实现高性能 TCP 通信
- 服务治理层 :集成 ZooKeeper 实现服务注册发现、一致性哈希负载均衡、熔断限流等
- 应用层 :提供动态代理接口调用和链路追踪能力
🔧 核心功能实现
1. 通信层(Netty)
客户端通信
package cn.phixlin.core.client.rpcclient.impl;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.discovery.ServiceDiscover;
import cn.phixlin.core.client.discovery.ZKServiceDiscover;
import cn.phixlin.core.client.netty.MDCChannelHandler;
import cn.phixlin.core.client.netty.NettyClientInitializer;
import cn.phixlin.core.client.rpcclient.RpcClient;
import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import cn.phixlin.rpc.common.serializer.serializer.Serializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.Map;
import static cn.phixlin.rpc.common.netty.NettyConstant.RPC_RESPONSE;
public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private final InetSocketAddress inetSocketAddress;
public NettyRpcClient(InetSocketAddress inetSocketAddress) {
this.inetSocketAddress = inetSocketAddress;
}
static {
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
// NettyClientInitializer 这里 配置 netty 对消息的处理机制
.handler(new NettyClientInitializer());}
@Override
public RpcResponse sendRequest(RpcRequest request) {
// 从注册中心获取 host,port
if (inetSocketAddress == null) return RpcResponse.fail("未匹配到服务");
try {
// 创建一个 channelFuture 对象,代表这一个操作事件,sync 方法表示堵塞直到 connect 完成
ChannelFuture channelFuture = bootstrap.connect(inetSocketAddress).sync();
// channel 表示一个连接的单位,类似 socket
Channel channel = channelFuture.channel();
// 将当前 TraceContext 保存在 channel 属性中
channel.attr(MDCChannelHandler.TRACE_CONTEXT_ATTR_KEY).set(TraceContext.getCopy());
// 发送数据
channel.writeAndFlush(request);
// sync() 堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果,通过给 channel 设计别名,获取特定名字下的 channel 中的内容(这个在 hanlder 中设置)
// AttributeKey 是,线程隔离的,不会由线程安全问题。
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf(RPC_RESPONSE);
RpcResponse response = channel.attr(key).get();
if (response == null) {
System.err.println(StrUtil.format("[客户端] 服务端响应为空, rpcRequest {}", request));
return RpcResponse.fail("客户端判断服务端响应为空");}
System.out.println(StrUtil.format("[客户端] 收到响应:{}", response.toString()));
return response;
} catch (InterruptedException e) {
System.err.println(StrUtil.format("[客户端] 请求被中断:{}", e.getMessage()));
Thread.currentThread().interrupt();
}
return RpcResponse.fail("客户端请求失败");}
@Override
public void close() {
try {eventLoopGroup.shutdownGracefully();
} catch (Exception e) {
System.out.println(StrUtil.format("[客户端] 关闭 Netty 资源时发生异常: {}", e.getMessage()));
Thread.currentThread().interrupt();
}
}
}
package cn.phixlin.core.client.netty;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.rpc.common.serializer.coder.CusDecoder;
import cn.phixlin.rpc.common.serializer.coder.CusEncoder;
import cn.phixlin.rpc.common.serializer.serializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
try {
// todo 做成配置项
pipeline.addLast(new CusEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new CusDecoder());
pipeline.addLast(new NettyClientHandler());
pipeline.addLast(new MDCChannelHandler());
// 客户端只关心写事件,如果 8 秒没有发送数据,则发送心跳包
// 当通道有一段时间没有执行读取、写入或同时执行这两种操作时,触发 IdleStateEvent。
pipeline.addLast(new IdleStateHandler(0, 8, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());} catch (Exception e) {
System.out.println(StrUtil.format("[netty] 初始化失败 {}", e.getMessage()));
throw e;
}
}
}
package cn.phixlin.core.client.netty;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.rpc.common.message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到 response, 给 channel 设计别名,让 sendRequest 里读取 response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(response);
// ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常处理
System.out.println(StrUtil.format("[客户端] 请求处理异常: {}", cause.getMessage()));
ctx.close();}
}
服务端通信
package cn.phixlin.core.server.rpcserver.impl;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.server.netty.NettyServerInitializer;
import cn.phixlin.core.server.register.ServiceProvider;
import cn.phixlin.core.server.rpcserver.RpcServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyRpcServer implements RpcServer {
private final ServiceProvider serviceProvider;
private ChannelFuture channelFuture;
public NettyRpcServer(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
}
@Override
public void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("[服务端] netty 启动");
try {
// 启动 netty 服务器
ServerBootstrap bootstrap = new ServerBootstrap();
// 初始化
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
// NettyClientInitializer 这里 配置 netty 对消息的处理机制
.childHandler(new NettyServerInitializer(serviceProvider));
// 同步堵塞
channelFuture = bootstrap.bind(port).sync();
System.out.println(StrUtil.format("[服务端] netty 绑定端口:{}", port));
// 死循环监听
channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {Thread.currentThread().interrupt();
System.out.println(StrUtil.format("[服务端] 服务中断:{}", e.getMessage()));} finally {shutdown(bossGroup, workGroup);
System.out.println(StrUtil.format("[服务端] netty 服务关闭"));}
}
@Override
public void stop() {
if (channelFuture != null) {
try {channelFuture.channel().close().sync();
} catch (InterruptedException e) {Thread.currentThread().interrupt();
System.out.println(StrUtil.format("[服务端] 关闭 netty 服务端主通道时中断"));}
} else {
System.out.println(StrUtil.format("[服务端] netty 服务端主通道尚未启动,无法关闭"));}
}
private void shutdown(NioEventLoopGroup bossGroup, NioEventLoopGroup workGroup) {
if (bossGroup != null){bossGroup.shutdownGracefully().syncUninterruptibly();}
if (workGroup != null) {workGroup.shutdownGracefully().syncUninterruptibly();}
}
}
package cn.phixlin.core.server.netty;
import cn.phixlin.core.client.netty.HeartbeatHandler;
import cn.phixlin.core.server.register.ServiceProvider;
import cn.phixlin.rpc.common.serializer.coder.CusDecoder;
import cn.phixlin.rpc.common.serializer.coder.CusEncoder;
import cn.phixlin.rpc.common.serializer.serializer.JsonSerializer;
import cn.phixlin.rpc.common.serializer.serializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private final ServiceProvider serviceProvider;
public NettyServerInitializer(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 服务端关注读事件和写事件,如果 10 秒内没有收到客户端消息,将会触发 IdleState.reader_idle 事件,将由 HeartbeatHandler 进行处理
pipeline.addLast(new IdleStateHandler(10,20,0,TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
// 使用自定义的编 / 解码器
pipeline.addLast(new CusEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new CusDecoder());
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}
package cn.phixlin.core.server.netty;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.server.ratelimit.RateLimit;
import cn.phixlin.core.server.register.ServiceProvider;
import cn.phixlin.core.trace.interceptor.ServiceTraceInterceptor;
import cn.phixlin.rpc.common.message.RequestType;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
if(request == null){
System.out.println(StrUtil.format("[服务端] 接受到非法请求,RpcRequest 为空"));
return;
}
if(request.getType() == RequestType.HEARTBEAT.getCode()){
System.out.println(StrUtil.format("[服务端] 接受到来自客户端的心跳"));
return;
}
// 日志链路记录
ServiceTraceInterceptor.beforeInvoke();
// 接收 request,读取并调用服务
RpcResponse response = getResponse(request);
// 日志链路上报
ServiceTraceInterceptor.afterInvoke(request.getInterfaceName());
ctx.writeAndFlush(response);
// 使用长连接,不进行资源回收
// ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
System.out.println(StrUtil.format("[服务端] 处理请求时发生异常:{}", e.getMessage()));
ctx.close();}
private RpcResponse getResponse(RpcRequest rpcRequest) {
// 得到服务名
String interfaceName = rpcRequest.getInterfaceName();
RateLimit rateLimit = serviceProvider.getRateLimitProvider().getRateLimit(interfaceName);
if (!rateLimit.getToken()) {
// 如果获取令牌失败,限流降级
System.out.println("[服务端] 限流降级");
return RpcResponse.fail("限流降级");}
// 得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
try {
// 反射调用方法
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke = method.invoke(service, rpcRequest.getParams());
return RpcResponse.suc(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
System.out.println("[服务端] 方法执行错误");
return RpcResponse.fail(e.getMessage());
}
}
}
2 服务治理
服务发现(ZooKeeper 实现)
package cn.phixlin.core.client.discovery;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.discovery.balance.ConsistencyHashBalance;
import cn.phixlin.core.client.discovery.balance.LoadBalance;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import static cn.phixlin.core.constants.ZKConstants.*;
public class ZKServiceDiscover implements ServiceDiscover {
private final CuratorFramework curatorFramework;
private final ServiceDiscoverCache serviceDiscoverCache;
private final LoadBalance loadBalance = new ConsistencyHashBalance();
public ZKServiceDiscover() {
this.curatorFramework = CuratorFrameworkFactory.builder().connectString(ZK_CONNECT_STRING)
.sessionTimeoutMs(40000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace(ZK_ROOT_PATH)
.build();
this.curatorFramework.start();
System.out.println("[zookeeper] 客户端发现启动");
// 初始化本地缓存
serviceDiscoverCache = new ServiceDiscoverCache();
// 监听 zookeeper 事件,自动更新缓存
ZKListener zkListener = new ZKListener(curatorFramework, serviceDiscoverCache);
zkListener.zkListener2Cache(ZK_ROOT_PATH);
}
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
// 1. 从客户端缓存中取
List<String> serviceAddressList = serviceDiscoverCache.getServiceFromCache(serviceName);
if (serviceAddressList == null || serviceAddressList.isEmpty()) {
// 2. 客户端缓存找不到就去 zk 找
serviceAddressList = curatorFramework.getChildren().forPath("/" + serviceName);
if (serviceAddressList == null) {
System.out.println(StrUtil.format("[zookeeper] 没有发现 {}", serviceAddressList));
return null;
}
List<String> serviceFromCache = serviceDiscoverCache.getServiceFromCache(serviceName);
if (serviceFromCache == null || serviceFromCache.isEmpty()){serviceAddressList.forEach(address -> serviceDiscoverCache.addService2Cache(serviceName, address));
}
}
System.out.println(StrUtil.format("[zookeeper] 发现服务 {}", serviceAddressList));
// 负载均衡获取地址
String addressStr = loadBalance.balance(serviceAddressList);
System.out.println(StrUtil.format("[zookeeper] 发现 {}", serviceAddressList));
return this.parseAddress(addressStr);
} catch (Exception e) {
System.err.println(StrUtil.format("[zookeeper] serviceName = {} 服务发现失败 {}", serviceName, e.getMessage()));}
return null;
}
private final Set<String> retryServiceCache = new CopyOnWriteArraySet<>();
@Override
public boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature) {
if (retryServiceCache.isEmpty()) {
try {
CuratorFramework rootClient = curatorFramework.usingNamespace(ZK_RETRY);
List<String> retryMethods = rootClient.getChildren().forPath("/" + getServiceAddress(serviceAddress));
retryServiceCache.addAll(retryMethods);
} catch (Exception e) {
System.err.println(StrUtil.format("[zookeeper] 检查重试失败,方法签名:{}, {}", methodSignature, e.getMessage()));}
}
return retryServiceCache.contains(methodSignature);
}
@Override
public void close() {curatorFramework.close();
}
/**
* 将 InetSocketAddress 解析为格式为 ip:port 的字符串
*
* @param serviceAddress 地址
* @return ip:port
*/
private String getServiceAddress(InetSocketAddress serviceAddress) {
return serviceAddress.getHostName() + ":" + serviceAddress.getPort();}
private InetSocketAddress parseAddress(String address) {String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));}
}
负载均衡(一致性哈希)
package cn.phixlin.core.client.discovery.balance;
import cn.hutool.core.util.StrUtil;
import java.util.*;
public class ConsistencyHashBalance implements LoadBalance {
// 虚拟节点的个数
private static final int VIRTUAL_NUM = 5;
// 虚拟节点分配,key 是 hash 值,value 是虚拟节点服务器名称
private final SortedMap<Integer, String> hashRing = new TreeMap<>();
// 真实节点列表
private final List<String> realNodes = new LinkedList<>();
@Override
public String balance(List<String> nodes) {
String uuid = UUID.randomUUID().toString();
return getNode(uuid, nodes);
}
@Override
public void addNode(String node) {hashRing.clear();
if (!realNodes.contains(node)) {realNodes.add(node);
System.out.println("[负载均衡] 真实节点 {} 上线添加");
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
hashRing.put(hash, virtualNode);
System.out.println(StrUtil.format("[负载均衡] 虚拟节点:{}, hash:{} 被添加"));}
}
}
@Override
public void removeNode(String node) {
if (realNodes.contains(node)) {realNodes.remove(node);
System.out.println(StrUtil.format("[负载均衡] 真实节点 {} 下线移除", node));
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
hashRing.remove(hash);
System.out.println(StrUtil.format("[负载均衡] 虚拟节点:{}, hash:{} 被下线移除", node));}
}
}
/**
* 获取被分配的节点名
*/
private String getNode(String node, List<String> nodes) {
this.init(nodes);
return getNode(node);
}
private String getNode(String node) {
int hash = getHash(node);
Integer key;
SortedMap<Integer, String> subMap = hashRing.tailMap(hash);
if (subMap.isEmpty()) {key = hashRing.lastKey();
} else {key = subMap.firstKey();
}
String virtualNode = hashRing.get(key);
return virtualNode.substring(0, virtualNode.indexOf("&&"));}
private void init(List<String> nodes) {
// hashRing.clear();
for (String node : nodes) {realNodes.add(node);
System.out.println(StrUtil.format("[负载均衡] 真实节点 {} 被添加", node));
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
hashRing.put(hash, virtualNode);
System.out.println(StrUtil.format("[负载均衡] 虚拟节点:{}, hash:{} 被添加", virtualNode, hash));}
}
}
/**
* FNV1_32_HASH 算法
*/
private static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0)hash = Math.abs(hash);
return hash;
}
}
熔断机制(Circuit Breaker)
package cn.phixlin.core.client.circuitbreaker;
import cn.hutool.core.util.StrUtil;
import java.util.HashMap;
import java.util.Map;
public class CircuitBreakerProvider {
Map<String, CircuitBreaker> circuitBreakerMap = new HashMap<>();
public synchronized CircuitBreaker getCircuitBreaker(String serviceName){
CircuitBreaker circuitBreaker;
if(circuitBreakerMap.containsKey(serviceName)){circuitBreaker = circuitBreakerMap.get(serviceName);
}else {
System.out.println(StrUtil.format("[熔断器] serviceName = {} 创建一个新的熔断器", serviceName));
circuitBreaker = new CircuitBreaker(1,0.5,10);
circuitBreakerMap.put(serviceName,circuitBreaker);
}
return circuitBreaker;
}
}
package cn.phixlin.core.client.circuitbreaker;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.atomic.AtomicInteger;
public class CircuitBreaker {
// 当前状态
private CircuitBreakerState state = CircuitBreakerState.CLOSED;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger requestCount = new AtomicInteger(0);
// 失败次数阈值
private final int failureThreshold;
// 半开启 -》关闭状态的成功次数比例
private final double halfOpenSuccessRate;
// 恢复时间
private final long retryTimePeriod;
// 上一次失败时间
private long lastFailureTime = 0;
public CircuitBreaker(int failureThreshold, double halfOpenSuccessRate, long retryTimePeriod) {
this.failureThreshold = failureThreshold;
this.halfOpenSuccessRate = halfOpenSuccessRate;
this.retryTimePeriod = retryTimePeriod;
}
/**
* 查看当前熔断器是否允许请求通过
*/
public synchronized boolean allowRequest() {
long cul = System.currentTimeMillis();
System.out.println(StrUtil.format("[熔断器] 熔断前失败: {} 次", failureCount));
switch (state) {
case OPEN -> {
if (cul - lastFailureTime > retryTimePeriod) {
state = CircuitBreakerState.HALF_OPEN;
// 重置统计
resetCounts();
return true;
}
System.out.println("[熔断器] 熔断生效中");
return false;
}
case HALF_OPEN -> {requestCount.incrementAndGet();
return true;
}
default -> {
return true;
}
}
}
/**
* 记录失败
*/
public synchronized void recordFailure() {failureCount.incrementAndGet();
System.out.println(StrUtil.format("[熔断器] 记录失败次数: {} 次", failureCount));
lastFailureTime = System.currentTimeMillis();
if (state == CircuitBreakerState.HALF_OPEN) {
state = CircuitBreakerState.OPEN;
lastFailureTime = System.currentTimeMillis();} else if (failureCount.get() >= failureThreshold){state = CircuitBreakerState.OPEN;}
}
/**
* 记录成功
*/
public synchronized void recordSuccess() {
if (state == CircuitBreakerState.HALF_OPEN) {successCount.incrementAndGet();
if (successCount.get() > halfOpenSuccessRate * requestCount.get()) {
state = CircuitBreakerState.CLOSED;
resetCounts();}
} else {resetCounts();
}
}
/**
* 重置统计次数
*/
public void resetCounts() {
failureCount.set(0);
successCount.set(0);
requestCount.set(0);}
}
重试机制(Guava Retry)
package cn.phixlin.core.client.retry;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.rpcclient.RpcClient;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import com.github.rholder.retry.*;
import java.util.concurrent.TimeUnit;
public class GuavaRetry {
public RpcResponse sendServiceWithRetry(RpcRequest rpcRequest, RpcClient rpcClient) {Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
// 无论出现什么异常,都进行重试
.retryIfException()
// 返回结果为 error 时进行重试
.retryIfResult(resp -> ObjectUtil.equals(resp.getCode(), RpcResponse.FAILCODE))
// 重试等待策略:等待 2s 后再进行重试
.withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS))
// 重试停止策略:重试达到 3 次
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
// 日志
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
System.out.println(StrUtil.format("[retry] RetryListener 第 {} 次调用", attempt.getAttemptNumber()));}
}).build();
try {
return retryer.call(()-> rpcClient.sendRequest(rpcRequest));
} catch (Exception e) {e.printStackTrace();
}
return RpcResponse.fail("重试后依旧失败");}
}
3 服务注册(ZooKeeper)
package cn.phixlin.core.server.register;
import cn.hutool.core.util.StrUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static cn.phixlin.core.constants.ZKConstants.*;
public class ZKServiceRegister implements ServiceRegister {
private final CuratorFramework curatorFramework;
public ZKServiceRegister() {
// zookeeper 的地址固定,不管是服务提供者还是,消费者都要与之建立连接
// sessionTimeoutMs 与 zoo.cfg 中的 tickTime 有关系,
// zk 还会根据 minSessionTimeout 与 maxSessionTimeout 两个参数重新调整最后的超时值。默认分别为 tickTime 的 2 倍和 20 倍
// 使用心跳监听状态
curatorFramework = CuratorFrameworkFactory.builder().connectString(ZK_CONNECT_STRING) // ZooKeeper 地址
.sessionTimeoutMs(40000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 指数退避重试策略
.namespace(ZK_ROOT_PATH) // 命名空间隔离
.build();
curatorFramework.start(); // 启动客户端
System.out.println("[zookeeper] 开始连接");}
@Override
public void register(Class<?> service, InetSocketAddress serviceAddress, boolean canRetry) {
String serviceName = service.getName();
try {
// serviceName 创建成永久节点,服务提供者下线时,不删服务名,只删地址
if (curatorFramework.checkExists().forPath("/" + serviceName) == null){curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
System.out.println(StrUtil.format("[zookeeper] 服务节点 {} 创建成功", "/" + serviceName));}
// 路径地址,一个 / 代表一个节点
String path = "/" + serviceName + "/" + getServiceAddress(serviceAddress);
if (curatorFramework.checkExists().forPath(path) == null) {
// 临时节点,服务器下线就删除节点
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
System.out.println(StrUtil.format("[zookeeper] 服务地址 {}, 注册成功", path));} else {
System.out.println(StrUtil.format("[zookeeper] 服务地址 {}, 已经存在, 跳过注册", path));}
// 注册重试白名单
if (canRetry) {
CuratorFramework rootClient = curatorFramework.usingNamespace(ZK_RETRY);
Method[] declaredMethods = service.getDeclaredMethods();
for (Method method : declaredMethods) {
String retryPath = "/" + getServiceAddress(serviceAddress) + "/" + getMethodSignature(service, method);
if (rootClient.checkExists().forPath(retryPath) == null){rootClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(retryPath);
System.out.println(StrUtil.format("[zookeeper] 白名单服务地址 {}, 注册成功", retryPath));}else {
System.out.println(StrUtil.format("[zookeeper] 白名单服务地址 {}, 已经存在, 跳过注册", retryPath));}
}
}
} catch (Exception e) {
System.out.println(StrUtil.format("[zookeeper] 服务: {}, 注册错误: {}", serviceName, e.getMessage()));}
}
private String getMethodSignature(Class<?> clazz, Method method) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(clazz.getName()).append("#").append(method.getName());
String parameterTypeStr = Arrays.stream(method.getParameterTypes()).map(Class::getName).collect(Collectors.joining(",", "(", ")"));
stringBuilder.append(parameterTypeStr);
return stringBuilder.toString();}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() + ":" + serverAddress.getPort();}
}
📈 附加功能
分布式追踪(Zipkin 集成)
package cn.phixlin.core.trace.interceptor;
import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.core.trace.TraceIdGenerator;
import cn.phixlin.core.trace.ZipkinReporter;
public class ServiceTraceInterceptor {
public static void beforeInvoke() {
String traceId = TraceContext.getTraceId();
String parentSpanId = TraceContext.getParentSpanId();
String spanId = TraceIdGenerator.generateSpanId();
TraceContext.setTraceId(traceId);
TraceContext.setSpanId(spanId);
TraceContext.setParentSpanId(parentSpanId);
// 记录服务端 Span
long startTimeStamp = System.currentTimeMillis();
TraceContext.setStartTimestamp(String.valueOf(startTimeStamp));
}
public static void afterInvoke(String serviceName) {
long endTimestamp = System.currentTimeMillis();
long startTimestamp = Long.parseLong(TraceContext.getStartTimestamp());
long duration = endTimestamp - startTimestamp;
// 上报服务端 Span
ZipkinReporter.reportSpan(TraceContext.getTraceId(),
TraceContext.getSpanId(),
TraceContext.getParentSpanId(),
"server-" + serviceName,
startTimestamp,
duration,
serviceName,
"server"
);
// 清理 TraceContext
TraceContext.clear();}
}
package cn.phixlin.core.trace.interceptor;
import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.core.trace.TraceIdGenerator;
import cn.phixlin.core.trace.ZipkinReporter;
public class ClientTraceInterceptor {
public static void beforeInvoke() {
// 1. 获取或者生成 traceId
String traceId = TraceContext.getTraceId();
if (traceId == null){traceId = TraceIdGenerator.generateTraceId();
}
// 2. 生成 spanId
String spanId = TraceIdGenerator.generateSpanId();
// 3. 生成时间戳
String startTimeStamp = String.valueOf(System.currentTimeMillis());
// 4. 写入 Span 信息到 线程上下文
TraceContext.setTraceId(traceId);
TraceContext.setSpanId(spanId);
TraceContext.setStartTimestamp(startTimeStamp);
}
public static void afterInvoke(String serviceName) {
// 1. 计算 Invoke 耗时
long endTimeStamp = System.currentTimeMillis();
long startTimeStamp = Long.parseLong(TraceContext.getStartTimestamp());
long duration = endTimeStamp - startTimeStamp;
// 2. 上报客户端 Span
ZipkinReporter.reportSpan(TraceContext.getTraceId(),
TraceContext.getSpanId(),
TraceContext.getParentSpanId(),
"client-" + serviceName,
startTimeStamp,
duration,
serviceName,
"client"
);
// 3. 清理 线程上下文
TraceContext.clear();}
}
package cn.phixlin.core.trace;
import cn.hutool.core.util.StrUtil;
import zipkin2.Span;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.okhttp3.OkHttpSender;
public class ZipkinReporter {
/**
* Zipkin 服务器地址
*/
private static final String ZIPKIN_URL = "http://localhost:9512/api/v2/spans";
private static final AsyncReporter<Span> reporter;
static {
OkHttpSender sender = OkHttpSender.create(ZIPKIN_URL);
reporter = zipkin2.reporter.AsyncReporter.create(sender);
}
public static void reportSpan(String traceId,
String spanId,
String parentSpanId,
String name,
long startTimestamp,
long duration,
String serviceName,
String type) {
Span span = Span.newBuilder().traceId(traceId)
.id(spanId)
.parentId(parentSpanId)
.name(name)
.timestamp(startTimestamp * 1000)
.duration(duration * 1000)
.putTag("service", serviceName)
.putTag("type", type).build();
reporter.report(span);
System.out.println(StrUtil.format("[zipkin] 当前 traceId: {} 上报日志", traceId));}
public static void close() {reporter.close();
}
}
流控策略(令牌桶算法)
package cn.phixlin.core.server.ratelimit;
import cn.hutool.core.util.StrUtil;
import java.util.HashMap;
import java.util.Map;
public class RateLimitProvider {
private final Map<String, RateLimit> rateLimitMap = new HashMap<>();
public RateLimit getRateLimit(String interfaceName) {
if (!rateLimitMap.containsKey(interfaceName)) {
int rate = 100;
int capacity = 200;
RateLimit rateLimit = new TokenBucketRateLimit(rate, capacity);
System.out.println(StrUtil.format("[注册限流器] rate: {}, capacity: {}", rate, capacity));
rateLimitMap.put(interfaceName, rateLimit);
return rateLimit;
}
return rateLimitMap.get(interfaceName);
}
}
package cn.phixlin.core.server.ratelimit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class TokenBucketRateLimit implements RateLimit {
// 上一次令牌发放时间
public long lastTime = System.currentTimeMillis();
// 桶的容量
public long CAPACITY;
// 令牌生成速度 /s
public int RATE;
// 当前令牌数量
public AtomicLong tokens = new AtomicLong(0);
// 时间戳
private volatile long startTimestamp = System.currentTimeMillis();
public TokenBucketRateLimit(int rate, int capacity) {
RATE = rate;
CAPACITY = capacity;
}
@Override
public synchronized boolean getToken() {
// 计算上次请求时间间隔内生产的令牌数量
long curTimestamp = System.currentTimeMillis();
long gapToken = (curTimestamp - startTimestamp) * RATE / 1000;
long allToken = tokens.get() + gapToken;
tokens.set(Math.min(CAPACITY, allToken));
if (tokens.get() < 1) {
return false;
} else {
tokens.getAndAdd(-1);
startTimestamp = curTimestamp;
return true;
}
}
}
动态代理(接口调用拦截)
package cn.phixlin.core.client.proxy;
import cn.hutool.core.util.StrUtil;
import cn.phixlin.core.client.circuitbreaker.CircuitBreaker;
import cn.phixlin.core.client.circuitbreaker.CircuitBreakerProvider;
import cn.phixlin.core.client.discovery.ZKServiceDiscover;
import cn.phixlin.core.client.retry.GuavaRetry;
import cn.phixlin.core.client.rpcclient.RpcClient;
import cn.phixlin.core.client.rpcclient.impl.NettyRpcClient;
import cn.phixlin.core.trace.TraceContext;
import cn.phixlin.core.trace.interceptor.ClientTraceInterceptor;
import cn.phixlin.rpc.common.message.RpcRequest;
import cn.phixlin.rpc.common.message.RpcResponse;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.stream.Collectors;
public class ClientProxy implements InvocationHandler {
private RpcClient rpcClient;
private final ZKServiceDiscover zkServiceDiscover;
private final CircuitBreakerProvider circuitBreakerProvider;
public ClientProxy() {
this.zkServiceDiscover = new ZKServiceDiscover();
this.circuitBreakerProvider = new CircuitBreakerProvider();}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
// 记录日志链路信息
ClientTraceInterceptor.beforeInvoke();
System.out.println(StrUtil.format("[客户端] TraceContext: {}", TraceContext.getTraceId() + ";" + TraceContext.getSpanId()));
Class<?>[] parameterTypes = method.getParameterTypes();
// 构建 request
RpcRequest request = RpcRequest.builder().interfaceName(method.getDeclaringClass().getName()).methodName(method.getName())
.params(args)
.paramsType(parameterTypes)
.build();
// 获取熔断器
CircuitBreaker circuitBreaker = circuitBreakerProvider.getCircuitBreaker(request.getMethodName());
// 判断熔断器是否允许请求经过
if (!circuitBreaker.allowRequest()) {
return RpcResponse.fail("熔断");}
// 数据传输
RpcResponse response;
// 后续添加逻辑:为保持幂等性,只对白名单上的服务进行重试
// 如果启用重试机制,先检查是否需要重试
String methodSignature = getMethodSignature(request.getInterfaceName(), method);
System.out.println(StrUtil.format("[客户端] 调用方法签名:{}", methodSignature));
InetSocketAddress inetSocketAddress = zkServiceDiscover.serviceDiscovery(request.getInterfaceName());
if(inetSocketAddress == null){
System.out.println("[客户端] 未发现可用服务");}
rpcClient = new NettyRpcClient(inetSocketAddress);
// 后续添加逻辑,为保持幂等性,只对白名单上服务进行重试
if (zkServiceDiscover.checkRetry(inetSocketAddress, methodSignature)) {
// 调用 retry 框架进行重试操作
response = new GuavaRetry().sendServiceWithRetry(request, rpcClient);
} else {response = rpcClient.sendRequest(request);
}
if (response.getCode() == RpcResponse.SUCCODE)circuitBreaker.recordSuccess();
if (response.getCode() == RpcResponse.FAILCODE)circuitBreaker.recordFailure();
// 记录日志链路信息
ClientTraceInterceptor.afterInvoke(request.getInterfaceName());
return response.getData();}
/**
* 获取方法签名
*
* @param interfaceName 接口类名
* @param method 方法
* @return 方法签名
*/
private String getMethodSignature(String interfaceName, Method method) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(interfaceName).append("#").append(method.getName());
Class<?>[] parameterTypes = method.getParameterTypes();
String parameterTypesStr = Arrays.stream(parameterTypes).map(Class::getName).collect(Collectors.joining(",", "(", ")"));
return stringBuilder.append(parameterTypesStr).toString();}
public <T> T getProxy(Class<T> clazz) {
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T) o;
}
public void close() {rpcClient.close();
zkServiceDiscover.close();}
}
📊 总结与展望
本框架通过 Netty 的高性能通信 和 ZooKeeper 的服务治理能力 ,实现了完整的 RPC 调用链路。核心优势包括:
- 基于一致性哈希的负载均衡算法,减少节点变动带来的影响
- 熔断 + 重试的容错组合,提升系统可用性
- 令牌桶限流防止服务雪崩,分布式追踪支持全链路监控
后续可优化方向:
- 支持 gRPC/HTTP 多协议接入
- 增加服务治理控制台
- 实现更细粒度的流量调度策略
License:
CC BY 4.0