rhy-rpc框架介绍
作者:B站up主:xhyovo
视频地址:[全网最全的手写RPC教程] 手摸手教你写一个RPC-架构-设计-落地实现_哔哩哔哩_bilibili
gitee地址:https://gitee.com/XhyQAQ/xhy-rpc
框架设计知识
自定义注解
RpcReference
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface RpcReference {
....
@Retention(RetentionPolicy.RUNTIME)
和 @Target(ElementType.FIELD)
是 Java 中的注解,分别用于指定注解的保留策略和注解的适用目标。
@Retention(RetentionPolicy.RUNTIME)
: 这个注解用于指定注解的保留策略,在这个例子中,RetentionPolicy.RUNTIME
表示这个注解将在运行时保留,也就是说,这个注解可以通过反射机制在运行时被读取和使用。这种保留策略允许你在运行时检查注解信息,这对于某些框架或库来说非常有用,比如在编写自定义注解处理器时。@Target(ElementType.FIELD)
: 这个注解用于指定注解的适用目标,即可以使用该注解的元素类型。在这个例子中,ElementType.FIELD
表示该注解只能应用在字段(field)上。这意味着只有类的字段可以被这个注解修饰。
综合起来,这两个注解的组合表示被注解标记的字段在运行时保留,并且该注解只能应用于字段上。
RpcService
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
...
该注解用于类上。
EnableConsumerRpc
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import(ConsumerPostProcessor.class)
public @interface EnableConsumerRpc {
}
该注解用于application启动类上
*在框架设计时,框架使用着通过注解来标识自己使用的接口,实现类。程序通过在框架中事先实现BeanPostProcessor接口#postProcessBeforeInitialization方法去找到注解标记的类,比如把标记的该注解的字段用的方法都代理(消费者使用rpc的场景),把标记注解的类找到(服务提供方将服务信息并放入缓存和服务注册)。
BeanPostProcessor
org.springframework.beans.factory.config.BeanPostProcessor
接口是 Spring 框架提供的一个重要接口,用于在容器实例化 bean 时进行一些自定义的处理。它允许开发者在 bean 实例化过程中介入,可以在 bean 初始化前后执行额外的操作。
该接口定义了两个方法:
- postProcessBeforeInitialization(Object bean, String beanName): 在容器实例化 bean 之后,在调用 bean 的初始化方法之前被调用。开发者可以在这个方法中对 bean 进行修改、处理或者根据需要做一些准备工作。
- postProcessAfterInitialization(Object bean, String beanName): 在容器实例化 bean 之后,在调用 bean 的初始化方法之后被调用。开发者可以在这个方法中对 bean 进行最终的处理,例如添加监听器、注册到其他组件等。
通常情况下,BeanPostProcessor
接口的实现类被称为 Bean 后置处理器。通过实现这个接口,开发者可以拦截所有的 bean 创建过程,并且在需要的时候进行自定义的处理。
*服务端在启动服务时,需要把netty的端口和连接一起启动起来,这个步骤在所有bean实例化完成后进行,实现InitializingBean接口的afterPropertiesSet方法中,并且建立netty连接时要通过异步去做。
InitializingBean
org.springframework.beans.factory.InitializingBean
接口是 Spring 框架中的一个接口,用于在 Bean 初始化阶段执行特定的初始化操作。这个接口只有一个方法需要实现,即 afterPropertiesSet()
。
**用途:**Bean 在被容器初始化之后执行自定义的初始化逻辑
替代方法:
-
初始化方法注解:你可以使用
@PostConstruct
注解来标注自定义初始化方法,该方法将在依赖注入完成后立即执行。 -
自定义初始化方法:你也可以通过在配置文件中指定
init-method
属性来调用 Bean 中的自定义初始化方法。
用途:常见的用例包括资源的初始化、连接的建立、依赖的检查等。
InitializingBean
接口提供了一种在 Spring 容器初始化阶段执行自定义初始化逻辑的方式,可以与其他 Spring 生命周期回调方法(如 @PostConstruct
、init-method
等)一起使用,以实现更加灵活的 Bean 初始化和管理。
EnvironmentAware
org.springframework.context.EnvironmentAware
接口是 Spring 框架中的一个接口,用于让 Bean 获取 Spring 应用程序上下文的环境信息。通过实现这个接口,Bean 可以访问应用程序的配置属性、profiles、活动profiles 等。
以下是对 EnvironmentAware
接口的简要介绍:
-
用途:
- 允许 Bean 获取 Spring 应用程序上下文的环境信息,如配置属性、profiles、活动profiles 等。
- 通过实现此接口,Bean 可以根据环境变量执行不同的逻辑,例如根据配置文件中的属性值来定制 Bean 的行为。
-
实现方法:
setEnvironment(Environment environment)
: 此方法由实现EnvironmentAware
接口的 Bean 实现,用于获取 Spring 应用程序上下文的环境信息。在这个方法中,你可以通过Environment
对象访问配置属性、profiles 等。
-
Environment 接口:
Environment
接口代表了 Spring 应用程序上下文的环境,提供了一系列方法来访问配置属性、profiles 等信息。- 通过
Environment
接口,你可以获取配置属性、检查 profiles 是否处于活动状态、检索活动的 profiles 等。
-
使用场景:
- 当 Bean 需要根据应用程序的环境信息执行不同的逻辑时,可以实现
EnvironmentAware
接口,并在setEnvironment()
方法中访问Environment
对象。 - 常见的用例包括根据配置文件中的属性值加载不同的配置、根据活动 profiles 执行不同的初始化逻辑等。
- 当 Bean 需要根据应用程序的环境信息执行不同的逻辑时,可以实现
总的来说,EnvironmentAware
接口提供了一种让 Bean 获取 Spring 应用程序上下文的环境信息的方式,可以帮助 Bean 根据环境变量执行不同的逻辑,从而实现更加灵活和可配置的应用程序。
框架配置文件加载方法:
1、设计一个RpcProperties java对象
2、设计两个注解@PropertiesPrefix和PropertiesField,用于标明配置文件的key。(属性名)
3、写一个PropertiesUtils,写个init方法去根据对象中的配置匹配配置文件。
4、实现EnvironmentAware接口setEnvironment,new一个Properties对象,通过PropertiesUtils读取配置文件读到该对象中
InvocationHandler
java.lang.reflect.InvocationHandler
接口是 Java 核心库中的一个重要接口,通常与 Java 动态代理一起使用。它允许在运行时动态地处理对代理对象方法的调用。
InvocationHandler
接口只定义了一个方法:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
在使用动态代理时,开发者需要提供一个实现了 InvocationHandler
接口的类,并在其中实现 invoke
方法。当代理对象的方法被调用时,invoke
方法将被调用,并且可以在其中执行自定义的逻辑。
下面是对 invoke
方法参数的说明:
-
proxy
:代理对象。在invoke
方法中,可以使用proxy
对象来调用被代理对象的方法,也可以选择不调用。 -
method
:被调用的方法对象。通过method
参数可以获取到被调用方法的信息,如方法名、参数等。 -
args
:方法调用时传递的参数数组。args
参数包含了调用方法时传递的所有参数。
invoke
方法需要返回一个 Object
类型的值,这个值通常是被代理方法的返回值。如果被代理方法有返回值,则 invoke
方法应该返回与被代理方法的返回值相同类型的对象;如果被代理方法没有返回值,则 invoke
方法应该返回 null
。
使用 InvocationHandler
接口,可以实现对代理对象方法的动态拦截、修改或增强。这种机制在 AOP(面向切面编程)、RPC(远程过程调用)等场景中经常被使用。
下面是一个简单的示例,演示了如何使用 InvocationHandler
接口:
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class MyInvocationHandler implements InvocationHandler {
private Object target;
public MyInvocationHandler(Object target) {
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("Before method invocation");
Object result = method.invoke(target, args);
System.out.println("After method invocation");
return result;
}
}
在上面的示例中,MyInvocationHandler
类实现了 InvocationHandler
接口,并在 invoke
方法中添加了前置和后置处理逻辑。通过将 MyInvocationHandler
与动态代理结合使用,可以实现对目标对象方法的调用前后进行额外的操作。
比如object = Proxy.newProxyInstance(....);
核心代码
消费方后置处理器
创建代理对象、读配置文件,初始化bean
@Configuration
public class ConsumerPostProcessor implements BeanPostProcessor, EnvironmentAware, InitializingBean {
private Logger logger = LoggerFactory.getLogger(ClientLogFilter.class);
RpcProperties rpcProperties;
/**
* 从配置文件中读取配置
* @param environment
*/
@Override
public void setEnvironment(Environment environment) {
RpcProperties properties = RpcProperties.getInstance();
PropertiesUtils.init(properties,environment);
rpcProperties = properties;
logger.info("读取配置文件成功");
}
/**
* 初始化一些bean
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
SerializationFactory.init();
RegistryFactory.init();
LoadBalancerFactory.init();
FilterConfig.initClientFilter();
}
/**
* 代理层注入
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 获取所有字段
final Field[] fields = bean.getClass().getDeclaredFields();
// 遍历所有字段找到 RpcReference 注解的字段
for (Field field : fields) {
if(field.isAnnotationPresent(RpcReference.class)){
final RpcReference rpcReference = field.getAnnotation(RpcReference.class);
final Class<?> aClass = field.getType();
field.setAccessible(true);
Object object = null;
try {
// 创建代理对象
object = Proxy.newProxyInstance(
aClass.getClassLoader(),
new Class<?>[]{aClass},
new RpcInvokerProxy(rpcReference.serviceVersion(),rpcReference.timeout(),rpcReference.faultTolerant(),
rpcReference.loadBalancer(),rpcReference.retryCount()));
} catch (Exception e) {
e.printStackTrace();
}
try {
// 将代理对象设置给字段
field.set(bean,object);
field.setAccessible(false);
logger.info(beanName + " field:" + field.getName() + "注入成功");
} catch (IllegalAccessException e) {
e.printStackTrace();
logger.info(beanName + " field:" + field.getName() + "注入失败");
}
}
}
return bean;
}
}
消费方调用服务的代理
RpcInvokerProxy该类用于消费方在调用rpc服务时创建代理对象
@Slf4j
public class RpcInvokerProxy implements InvocationHandler {
private String serviceVersion;
private long timeout;
private String loadBalancerType;
private String faultTolerantType;
private long retryCount;
public RpcInvokerProxy(String serviceVersion, long timeout,String faultTolerantType,String loadBalancerType,long retryCount) throws Exception {
this.serviceVersion = serviceVersion;
this.timeout = timeout;
this.loadBalancerType = loadBalancerType;
this.faultTolerantType = faultTolerantType;
this.retryCount = retryCount;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
// 构建消息头
MsgHeader header = new MsgHeader();
long requestId = RpcRequestHolder.REQUEST_ID_GEN.incrementAndGet();
header.setMagic(ProtocolConstants.MAGIC);
header.setVersion(ProtocolConstants.VERSION);
header.setRequestId(requestId);
final byte[] serialization = RpcProperties.getInstance().getSerialization().getBytes();
header.setSerializationLen(serialization.length);
header.setSerializations(serialization);
header.setMsgType((byte) MsgType.REQUEST.ordinal());
header.setStatus((byte) 0x1);
protocol.setHeader(header);
// 构建请求体
RpcRequest request = new RpcRequest();
request.setServiceVersion(this.serviceVersion);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setData(ObjectUtils.isEmpty(args) ? new Object[0] : args);
request.setDataClass(ObjectUtils.isEmpty(args) ? null : args[0].getClass());
request.setServiceAttachments(RpcProperties.getInstance().getServiceAttachments());
request.setClientAttachments(RpcProperties.getInstance().getClientAttachments());
// 拦截器的上下文
final FilterData filterData = new FilterData(request);
try {
FilterConfig.getClientBeforeFilterChain().doFilter(filterData);
}catch (Throwable e){
throw e;
}
protocol.setBody(request);
RpcConsumer rpcConsumer = new RpcConsumer();
String serviceName = RpcServiceNameBuilder.buildServiceKey(request.getClassName(), request.getServiceVersion());
Object[] params = {request.getData()};
// 1.获取负载均衡策略
final LoadBalancer loadBalancer = LoadBalancerFactory.get(loadBalancerType);
// 2.根据策略获取对应服务
final ServiceMetaRes serviceMetaRes = loadBalancer.select(params, serviceName);
ServiceMeta curServiceMeta = serviceMetaRes.getCurServiceMeta();
final Collection<ServiceMeta> otherServiceMeta = serviceMetaRes.getOtherServiceMeta();
long count = 1;
long retryCount = this.retryCount;
RpcResponse rpcResponse = null;
// 重试机制
while (count <= retryCount ){
// 处理返回数据
RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()), timeout);
// XXXHolder
RpcRequestHolder.REQUEST_MAP.put(requestId, future);
try {
// 发送消息
rpcConsumer.sendRequest(protocol, curServiceMeta);
// 等待响应数据返回
rpcResponse = future.getPromise().get(future.getTimeout(), TimeUnit.MILLISECONDS);
// 如果有异常并且没有其他服务
if(rpcResponse.getException()!=null && otherServiceMeta.size() == 0){
throw rpcResponse.getException();
}
if (rpcResponse.getException()!=null){
throw rpcResponse.getException();
}
log.info("rpc 调用成功, serviceName: {}",serviceName);
try {
FilterConfig.getClientAfterFilterChain().doFilter(filterData);
}catch (Throwable e){
throw e;
}
return rpcResponse.getData();
}catch (Throwable e){
String errorMsg = e.toString();
// todo 这里的容错机制可拓展,留作业自行更改
switch (faultTolerantType){
// 快速失败
case FailFast:
log.warn("rpc 调用失败,触发 FailFast 策略,异常信息: {}",errorMsg);
return rpcResponse.getException();
// 故障转移
case Failover:
log.warn("rpc 调用失败,第{}次重试,异常信息:{}",count,errorMsg);
count++;
if (!ObjectUtils.isEmpty(otherServiceMeta)){
final ServiceMeta next = otherServiceMeta.iterator().next();
curServiceMeta = next;
otherServiceMeta.remove(next);
}else {
final String msg = String.format("rpc 调用失败,无服务可用 serviceName: {%s}, 异常信息: {%s}", serviceName, errorMsg);
log.warn(msg);
throw new RuntimeException(msg);
}
break;
// 忽视这次错误
case Failsafe:
return null;
}
}
}
throw new RuntimeException("rpc 调用失败,超过最大重试次数: {}" + retryCount);
}
}
消费方发送数据
通过netty建立连接发送数据
public class RpcConsumer {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
private Logger logger = LoggerFactory.getLogger(RpcConsumer.class);
public RpcConsumer() {
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(4);
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
.addLast(new RpcResponseHandler());
}
});
}
/**
* 发送请求
* @param protocol 消息
* @param serviceMetadata 服务
* @return 当前服务
* @throws Exception
*/
public void sendRequest(RpcProtocol<RpcRequest> protocol, ServiceMeta serviceMetadata) throws Exception {
if (serviceMetadata != null) {
// 连接
ChannelFuture future = bootstrap.connect(serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()).sync();
future.addListener((ChannelFutureListener) arg0 -> {
if (future.isSuccess()) {
logger.info("连接 rpc server {} 端口 {} 成功.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());
} else {
logger.error("连接 rpc server {} 端口 {} 失败.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());
future.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
}
});
// 写入数据
future.channel().writeAndFlush(protocol);
}
}
}
服务提供方后置处理器
public class ProviderPostProcessor implements InitializingBean,BeanPostProcessor, EnvironmentAware {
private Logger logger = LoggerFactory.getLogger(ProviderPostProcessor.class);
RpcProperties rpcProperties;
// 此处在linux环境下改为0.0.0.0
private static String serverAddress = "127.0.0.1";
private final Map<String, Object> rpcServiceMap = new HashMap<>();
@Override
public void afterPropertiesSet() throws Exception {
Thread t = new Thread(() -> {
try {
startRpcServer();
} catch (Exception e) {
logger.error("start rpc server error.", e);
}
});
t.setDaemon(true);
t.start();
SerializationFactory.init();
RegistryFactory.init();
LoadBalancerFactory.init();
FilterConfig.initServiceFilter();
ThreadPollFactory.setRpcServiceMap(rpcServiceMap);
}
private void startRpcServer() throws InterruptedException {
int serverPort = rpcProperties.getPort();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
.addLast(new ServiceBeforeFilterHandler())
.addLast(new RpcRequestHandler())
.addLast(new ServiceAfterFilterHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, serverPort).sync();
logger.info("server addr {} started on port {}", this.serverAddress, serverPort);
channelFuture.channel().closeFuture().sync();
Runtime.getRuntime().addShutdownHook(new Thread(() ->
{
logger.info("ShutdownHook execute start...");
logger.info("Netty NioEventLoopGroup shutdownGracefully...");
logger.info("Netty NioEventLoopGroup shutdownGracefully2...");
boss.shutdownGracefully();
worker.shutdownGracefully();
logger.info("ShutdownHook execute end...");
}, "Allen-thread"));
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
/**
* 服务注册
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
// 找到bean上带有 RpcService 注解的类
RpcService rpcService = beanClass.getAnnotation(RpcService.class);
if (rpcService != null) {
// 可能会有多个接口,默认选择第一个接口
String serviceName = beanClass.getInterfaces()[0].getName();
if (!rpcService.serviceInterface().equals(void.class)){
serviceName = rpcService.serviceInterface().getName();
}
String serviceVersion = rpcService.serviceVersion();
try {
// 服务注册
int servicePort = rpcProperties.getPort();
// 获取注册中心 ioc
RegistryService registryService = RegistryFactory.get(rpcProperties.getRegisterType());
ServiceMeta serviceMeta = new ServiceMeta();
// 服务提供方地址
serviceMeta.setServiceAddr("127.0.0.1");
serviceMeta.setServicePort(servicePort);
serviceMeta.setServiceVersion(serviceVersion);
serviceMeta.setServiceName(serviceName);
registryService.register(serviceMeta);
// 缓存
rpcServiceMap.put(RpcServiceNameBuilder.buildServiceKey(serviceMeta.getServiceName(),serviceMeta.getServiceVersion()), bean);
logger.info("register server {} version {}",serviceName,serviceVersion);
} catch (Exception e) {
logger.error("failed to register service {}", serviceVersion, e);
}
}
return bean;
}
@Override
public void setEnvironment(Environment environment) {
RpcProperties properties = RpcProperties.getInstance();
PropertiesUtils.init(properties,environment);
rpcProperties = properties;
logger.info("读取配置文件成功");
}
}
rpc架构
代理层
object = Proxy.newProxyInstance(
aClass.getClassLoader(),
new Class<?>[]{aClass},
new RpcInvokerProxy(rpcReference.serviceVersion(),rpcReference.timeout(),rpcReference.faultTolerant(),
rpcReference.loadBalancer(),rpcReference.retryCount()));
注册中心
redis,zk
public interface RegistryService {
/**
* 服务注册
* @param serviceMeta
* @throws Exception
*/
void register(ServiceMeta serviceMeta) throws Exception;
/**
* 服务注销
* @param serviceMeta
* @throws Exception
*/
void unRegister(ServiceMeta serviceMeta) throws Exception;
/**
* 获取 serviceName 下的所有服务
* @param serviceName
* @return
*/
List<ServiceMeta> discoveries(String serviceName);
/**
* 关闭
* @throws IOException
*/
void destroy() throws IOException;
}
路由层
负载均衡(轮询/一致性哈希等)
router
public interface LoadBalancer<T> {
/**
* 选择负载均衡策略
* @param params 入参,可自定义拿到入参后自行处理负载策略
* @param serviceName 服务key
* @return 当前服务节点以及其他节点,用于给容错使用
*/
ServiceMetaRes select(Object[] params,String serviceName);
}
故障转移
}catch (Throwable e){
String errorMsg = e.toString();
// todo 这里的容错机制可拓展,留作业自行更改
switch (faultTolerantType){
// 快速失败
case FailFast:
log.warn("rpc 调用失败,触发 FailFast 策略,异常信息: {}",errorMsg);
return rpcResponse.getException();
// 故障转移
case Failover:
log.warn("rpc 调用失败,第{}次重试,异常信息:{}",count,errorMsg);
count++;
if (!ObjectUtils.isEmpty(otherServiceMeta)){
final ServiceMeta next = otherServiceMeta.iterator().next();
curServiceMeta = next;
otherServiceMeta.remove(next);
}else {
final String msg = String.format("rpc 调用失败,无服务可用 serviceName: {%s}, 异常信息: {%s}", serviceName, errorMsg);
log.warn(msg);
throw new RuntimeException(msg);
}
break;
// 忽视这次错误
case Failsafe:
return null;
}
}
协议层
编码,解码
socketChannel.pipeline()
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
拦截器层
拦截器链
public class FilterChain {
private List<Filter> filters = new ArrayList<>();
public void addFilter(Filter filter){
filters.add(filter);
}
public void addFilter(List<Object> filters){
for (Object filter : filters) {
addFilter((Filter) filter);
}
}
public void doFilter(FilterData data){
for (Filter filter : filters) {
filter.doFilter(data);
}
}
}
客户端拦截器
ClientAfterFilter,ClientBeforeFilter
final FilterData filterData = new FilterData(request);
try {
FilterConfig.getClientBeforeFilterChain().doFilter(filterData);
}catch (Throwable e){
throw e;
}
protocol.setBody(request);
服务端连接器
ServiceAfterFilter,ServiceBeforeFilter
.addLast(new ServiceBeforeFilterHandler())
.addLast(new ServiceAfterFilterHandler());
**SPI **
org.xhystudy.rpc.spi.ExtensionLoader
/**
* 根据spi机制初加载bean的信息放入map
* @param clazz
* @throws IOException
* @throws ClassNotFoundException
*/
public void loadExtension(Class clazz) throws IOException, ClassNotFoundException {
if (clazz == null) {
throw new IllegalArgumentException("class 没找到");
}
ClassLoader classLoader = this.getClass().getClassLoader();
Map<String, Class> classMap = new HashMap<>();
// 从系统SPI以及用户SPI中找bean
for (String prefix : prefixs) {
String spiFilePath = prefix + clazz.getName();
Enumeration<URL> enumeration = classLoader.getResources(spiFilePath);
while (enumeration.hasMoreElements()) {
URL url = enumeration.nextElement();
InputStreamReader inputStreamReader = null;
inputStreamReader = new InputStreamReader(url.openStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line;
while ((line = bufferedReader.readLine()) != null) {
String[] lineArr = line.split("=");
String key = lineArr[0];
String name = lineArr[1];
final Class<?> aClass = Class.forName(name);
extensionClassCache.put(key, aClass);
classMap.put(key, aClass);
logger.info("加载bean key:{} , value:{}",key,name);
}
}
}
extensionClassCaches.put(clazz.getName(),classMap);
}
请求处理器
.addLast(new RpcRequestHandler())
public static void submitRequest(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol){
final RpcRequest request = protocol.getBody();
String key = request.getClassName() + request.getMethodName() + request.getServiceVersion();
ThreadPoolExecutor poll = fastPoll;
if (slowTaskMap.containsKey(key) && slowTaskMap.get(key).intValue() >= 10){
poll = slowPoll;
}
poll.submit(()->{
RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
final MsgHeader header = protocol.getHeader();
RpcResponse response = new RpcResponse();
long startTime = System.currentTimeMillis();
try {
final Object result = submit(ctx, protocol);
response.setData(result);
response.setDataClass(result == null ? null : result.getClass());
header.setStatus((byte) MsgStatus.SUCCESS.ordinal());
} catch (Exception e) {
// 执行业务失败则将异常返回
header.setStatus((byte) MsgStatus.FAILED.ordinal());
response.setException(e);
logger.error("process request {} error", header.getRequestId(), e);
}finally {
long cost = System.currentTimeMillis() - startTime;
System.out.println("cost time:" + cost);
if(cost > 1000){
final AtomicInteger timeOutCount = slowTaskMap.putIfAbsent(key, new AtomicInteger(1));
if (timeOutCount!=null){
timeOutCount.incrementAndGet();
}
}
}
resProtocol.setHeader(header);
resProtocol.setBody(response);
logger.info("执行成功: {},{},{},{}",Thread.currentThread().getName(),request.getClassName(),request.getMethodName(),request.getServiceVersion());
ctx.fireChannelRead(resProtocol);
});
}