框架学习-rhyRPC

rhy-rpc框架介绍 作者:B站up主:xhyovo 视频地址:[全网最全的手写RPC教程] 手摸手教你写一个RPC-架构-设计-落地实现_哔哩哔哩_bilibili gitee地址:https://gitee.com/XhyQAQ/xhy-rpc 框架设计知识 自定义注解 RpcReferenc

rhy-rpc框架介绍

作者:B站up主:xhyovo

视频地址:[全网最全的手写RPC教程] 手摸手教你写一个RPC-架构-设计-落地实现_哔哩哔哩_bilibili

gitee地址:https://gitee.com/XhyQAQ/xhy-rpc

框架设计知识

自定义注解

RpcReference

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface RpcReference {
....

@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.FIELD) 是 Java 中的注解,分别用于指定注解的保留策略和注解的适用目标。

  1. @Retention(RetentionPolicy.RUNTIME): 这个注解用于指定注解的保留策略,在这个例子中,RetentionPolicy.RUNTIME 表示这个注解将在运行时保留,也就是说,这个注解可以通过反射机制在运行时被读取和使用。这种保留策略允许你在运行时检查注解信息,这对于某些框架或库来说非常有用,比如在编写自定义注解处理器时。
  2. @Target(ElementType.FIELD): 这个注解用于指定注解的适用目标,即可以使用该注解的元素类型。在这个例子中,ElementType.FIELD 表示该注解只能应用在字段(field)上。这意味着只有类的字段可以被这个注解修饰。

综合起来,这两个注解的组合表示被注解标记的字段在运行时保留,并且该注解只能应用于字段上。

RpcService

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
...

该注解用于类上。

EnableConsumerRpc

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import(ConsumerPostProcessor.class)
public @interface EnableConsumerRpc {

}

该注解用于application启动类上

*在框架设计时,框架使用着通过注解来标识自己使用的接口,实现类。程序通过在框架中事先实现BeanPostProcessor接口#postProcessBeforeInitialization方法去找到注解标记的类,比如把标记的该注解的字段用的方法都代理(消费者使用rpc的场景),把标记注解的类找到(服务提供方将服务信息并放入缓存和服务注册)。

BeanPostProcessor

org.springframework.beans.factory.config.BeanPostProcessor 接口是 Spring 框架提供的一个重要接口,用于在容器实例化 bean 时进行一些自定义的处理。它允许开发者在 bean 实例化过程中介入,可以在 bean 初始化前后执行额外的操作。

该接口定义了两个方法:

  1. postProcessBeforeInitialization(Object bean, String beanName): 在容器实例化 bean 之后,在调用 bean 的初始化方法之前被调用。开发者可以在这个方法中对 bean 进行修改、处理或者根据需要做一些准备工作。
  2. postProcessAfterInitialization(Object bean, String beanName): 在容器实例化 bean 之后,在调用 bean 的初始化方法之后被调用。开发者可以在这个方法中对 bean 进行最终的处理,例如添加监听器、注册到其他组件等。

通常情况下,BeanPostProcessor 接口的实现类被称为 Bean 后置处理器。通过实现这个接口,开发者可以拦截所有的 bean 创建过程,并且在需要的时候进行自定义的处理。

*服务端在启动服务时,需要把netty的端口和连接一起启动起来,这个步骤在所有bean实例化完成后进行,实现InitializingBean接口的afterPropertiesSet方法中,并且建立netty连接时要通过异步去做。

InitializingBean

org.springframework.beans.factory.InitializingBean 接口是 Spring 框架中的一个接口,用于在 Bean 初始化阶段执行特定的初始化操作。这个接口只有一个方法需要实现,即 afterPropertiesSet()

**用途:**Bean 在被容器初始化之后执行自定义的初始化逻辑

替代方法

  • 初始化方法注解:你可以使用 @PostConstruct 注解来标注自定义初始化方法,该方法将在依赖注入完成后立即执行。

  • 自定义初始化方法:你也可以通过在配置文件中指定 init-method 属性来调用 Bean 中的自定义初始化方法。

用途:常见的用例包括资源的初始化、连接的建立、依赖的检查等。

InitializingBean 接口提供了一种在 Spring 容器初始化阶段执行自定义初始化逻辑的方式,可以与其他 Spring 生命周期回调方法(如 @PostConstructinit-method 等)一起使用,以实现更加灵活的 Bean 初始化和管理。

EnvironmentAware

org.springframework.context.EnvironmentAware 接口是 Spring 框架中的一个接口,用于让 Bean 获取 Spring 应用程序上下文的环境信息。通过实现这个接口,Bean 可以访问应用程序的配置属性、profiles、活动profiles 等。

以下是对 EnvironmentAware 接口的简要介绍:

  1. 用途

    • 允许 Bean 获取 Spring 应用程序上下文的环境信息,如配置属性、profiles、活动profiles 等。
    • 通过实现此接口,Bean 可以根据环境变量执行不同的逻辑,例如根据配置文件中的属性值来定制 Bean 的行为。
  2. 实现方法

    • setEnvironment(Environment environment): 此方法由实现 EnvironmentAware 接口的 Bean 实现,用于获取 Spring 应用程序上下文的环境信息。在这个方法中,你可以通过 Environment 对象访问配置属性、profiles 等。
  3. Environment 接口

    • Environment 接口代表了 Spring 应用程序上下文的环境,提供了一系列方法来访问配置属性、profiles 等信息。
    • 通过 Environment 接口,你可以获取配置属性、检查 profiles 是否处于活动状态、检索活动的 profiles 等。
  4. 使用场景

    • 当 Bean 需要根据应用程序的环境信息执行不同的逻辑时,可以实现 EnvironmentAware 接口,并在 setEnvironment() 方法中访问 Environment 对象。
    • 常见的用例包括根据配置文件中的属性值加载不同的配置、根据活动 profiles 执行不同的初始化逻辑等。

总的来说,EnvironmentAware 接口提供了一种让 Bean 获取 Spring 应用程序上下文的环境信息的方式,可以帮助 Bean 根据环境变量执行不同的逻辑,从而实现更加灵活和可配置的应用程序。

框架配置文件加载方法:

1、设计一个RpcProperties java对象

2、设计两个注解@PropertiesPrefix和PropertiesField,用于标明配置文件的key。(属性名)

3、写一个PropertiesUtils,写个init方法去根据对象中的配置匹配配置文件。

4、实现EnvironmentAware接口setEnvironment,new一个Properties对象,通过PropertiesUtils读取配置文件读到该对象中

InvocationHandler

java.lang.reflect.InvocationHandler 接口是 Java 核心库中的一个重要接口,通常与 Java 动态代理一起使用。它允许在运行时动态地处理对代理对象方法的调用。

InvocationHandler 接口只定义了一个方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;

在使用动态代理时,开发者需要提供一个实现了 InvocationHandler 接口的类,并在其中实现 invoke 方法。当代理对象的方法被调用时,invoke 方法将被调用,并且可以在其中执行自定义的逻辑。

下面是对 invoke 方法参数的说明:

  • proxy:代理对象。在 invoke 方法中,可以使用 proxy 对象来调用被代理对象的方法,也可以选择不调用。

  • method:被调用的方法对象。通过 method 参数可以获取到被调用方法的信息,如方法名、参数等。

  • args:方法调用时传递的参数数组。args 参数包含了调用方法时传递的所有参数。

invoke 方法需要返回一个 Object 类型的值,这个值通常是被代理方法的返回值。如果被代理方法有返回值,则 invoke 方法应该返回与被代理方法的返回值相同类型的对象;如果被代理方法没有返回值,则 invoke 方法应该返回 null

使用 InvocationHandler 接口,可以实现对代理对象方法的动态拦截、修改或增强。这种机制在 AOP(面向切面编程)、RPC(远程过程调用)等场景中经常被使用。

下面是一个简单的示例,演示了如何使用 InvocationHandler 接口:

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class MyInvocationHandler implements InvocationHandler {

    private Object target;

    public MyInvocationHandler(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("Before method invocation");
        Object result = method.invoke(target, args);
        System.out.println("After method invocation");
        return result;
    }
}

在上面的示例中,MyInvocationHandler 类实现了 InvocationHandler 接口,并在 invoke 方法中添加了前置和后置处理逻辑。通过将 MyInvocationHandler动态代理结合使用,可以实现对目标对象方法的调用前后进行额外的操作。

比如object = Proxy.newProxyInstance(....);

核心代码

消费方后置处理器

创建代理对象、读配置文件,初始化bean

@Configuration
public class ConsumerPostProcessor implements BeanPostProcessor, EnvironmentAware, InitializingBean {

    private Logger logger = LoggerFactory.getLogger(ClientLogFilter.class);

    RpcProperties rpcProperties;

    /**
     * 从配置文件中读取配置
     * @param environment
     */
    @Override
    public void setEnvironment(Environment environment) {
        RpcProperties properties = RpcProperties.getInstance();
        PropertiesUtils.init(properties,environment);
        rpcProperties = properties;
        logger.info("读取配置文件成功");

    }

    /**
     * 初始化一些bean
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        SerializationFactory.init();
        RegistryFactory.init();
        LoadBalancerFactory.init();
        FilterConfig.initClientFilter();
    }

    /**
     * 代理层注入
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 获取所有字段
        final Field[] fields = bean.getClass().getDeclaredFields();
        // 遍历所有字段找到 RpcReference 注解的字段
        for (Field field : fields) {
            if(field.isAnnotationPresent(RpcReference.class)){
                final RpcReference rpcReference = field.getAnnotation(RpcReference.class);
                final Class<?> aClass = field.getType();
                field.setAccessible(true);
                Object object = null;
                try {
                    // 创建代理对象
                    object = Proxy.newProxyInstance(
                            aClass.getClassLoader(),
                            new Class<?>[]{aClass},
                            new RpcInvokerProxy(rpcReference.serviceVersion(),rpcReference.timeout(),rpcReference.faultTolerant(),
                                    rpcReference.loadBalancer(),rpcReference.retryCount()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    // 将代理对象设置给字段
                    field.set(bean,object);
                    field.setAccessible(false);
                    logger.info(beanName + " field:" + field.getName() + "注入成功");
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                    logger.info(beanName + " field:" + field.getName() + "注入失败");
                }
            }
        }
        return bean;
    }
}

消费方调用服务的代理

RpcInvokerProxy该类用于消费方在调用rpc服务时创建代理对象

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

    private String serviceVersion;
    private long timeout;
    private String loadBalancerType;
    private String faultTolerantType;
    private long retryCount;


    public RpcInvokerProxy(String serviceVersion, long timeout,String faultTolerantType,String loadBalancerType,long retryCount) throws Exception {
        this.serviceVersion = serviceVersion;
        this.timeout = timeout;
        this.loadBalancerType = loadBalancerType;
        this.faultTolerantType = faultTolerantType;
        this.retryCount = retryCount;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
        // 构建消息头
        MsgHeader header = new MsgHeader();
        long requestId = RpcRequestHolder.REQUEST_ID_GEN.incrementAndGet();
        header.setMagic(ProtocolConstants.MAGIC);
        header.setVersion(ProtocolConstants.VERSION);
        header.setRequestId(requestId);
        final byte[] serialization = RpcProperties.getInstance().getSerialization().getBytes();
        header.setSerializationLen(serialization.length);
        header.setSerializations(serialization);
        header.setMsgType((byte) MsgType.REQUEST.ordinal());
        header.setStatus((byte) 0x1);
        protocol.setHeader(header);

        // 构建请求体
        RpcRequest request = new RpcRequest();
        request.setServiceVersion(this.serviceVersion);
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setData(ObjectUtils.isEmpty(args) ? new Object[0] : args);
        request.setDataClass(ObjectUtils.isEmpty(args) ? null : args[0].getClass());
        request.setServiceAttachments(RpcProperties.getInstance().getServiceAttachments());
        request.setClientAttachments(RpcProperties.getInstance().getClientAttachments());
        // 拦截器的上下文
        final FilterData filterData = new FilterData(request);
        try {
            FilterConfig.getClientBeforeFilterChain().doFilter(filterData);
        }catch (Throwable e){
            throw e;
        }
        protocol.setBody(request);

        RpcConsumer rpcConsumer = new RpcConsumer();

        String serviceName = RpcServiceNameBuilder.buildServiceKey(request.getClassName(), request.getServiceVersion());
        Object[] params = {request.getData()};
        // 1.获取负载均衡策略
        final LoadBalancer loadBalancer = LoadBalancerFactory.get(loadBalancerType);

        // 2.根据策略获取对应服务
        final ServiceMetaRes serviceMetaRes = loadBalancer.select(params, serviceName);

        ServiceMeta curServiceMeta = serviceMetaRes.getCurServiceMeta();
        final Collection<ServiceMeta> otherServiceMeta = serviceMetaRes.getOtherServiceMeta();
        long count = 1;
        long retryCount = this.retryCount;
        RpcResponse rpcResponse = null;
        // 重试机制
        while (count <= retryCount ){
            // 处理返回数据
            RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()), timeout);
            // XXXHolder
            RpcRequestHolder.REQUEST_MAP.put(requestId, future);
            try {
                // 发送消息
                rpcConsumer.sendRequest(protocol, curServiceMeta);
                // 等待响应数据返回
                rpcResponse = future.getPromise().get(future.getTimeout(), TimeUnit.MILLISECONDS);
                // 如果有异常并且没有其他服务
                if(rpcResponse.getException()!=null && otherServiceMeta.size() == 0){
                    throw rpcResponse.getException();
                }
                if (rpcResponse.getException()!=null){
                    throw rpcResponse.getException();
                }
                log.info("rpc 调用成功, serviceName: {}",serviceName);
                try {
                    FilterConfig.getClientAfterFilterChain().doFilter(filterData);
                }catch (Throwable e){
                    throw e;
                }
                return rpcResponse.getData();
            }catch (Throwable e){
                String errorMsg = e.toString();
                // todo 这里的容错机制可拓展,留作业自行更改
                switch (faultTolerantType){
                    // 快速失败
                    case FailFast:
                        log.warn("rpc 调用失败,触发 FailFast 策略,异常信息: {}",errorMsg);
                        return rpcResponse.getException();
                    // 故障转移
                    case Failover:
                        log.warn("rpc 调用失败,第{}次重试,异常信息:{}",count,errorMsg);
                        count++;
                        if (!ObjectUtils.isEmpty(otherServiceMeta)){
                            final ServiceMeta next = otherServiceMeta.iterator().next();
                            curServiceMeta = next;
                            otherServiceMeta.remove(next);
                        }else {
                            final String msg = String.format("rpc 调用失败,无服务可用 serviceName: {%s}, 异常信息: {%s}", serviceName, errorMsg);
                            log.warn(msg);
                            throw new RuntimeException(msg);
                        }
                        break;
                    // 忽视这次错误
                    case Failsafe:
                        return null;
                }
            }
        }

        throw new RuntimeException("rpc 调用失败,超过最大重试次数: {}" + retryCount);
    }
}
消费方发送数据

通过netty建立连接发送数据

public class RpcConsumer {

    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup;
    private Logger logger = LoggerFactory.getLogger(RpcConsumer.class);

    public RpcConsumer() {
        bootstrap = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup(4);
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new RpcEncoder())
                                .addLast(new RpcDecoder())
                                .addLast(new RpcResponseHandler());
                    }
                });
    }

    /**
     * 发送请求
     * @param protocol 消息
     * @param serviceMetadata 服务
     * @return 当前服务
     * @throws Exception
     */
    public void sendRequest(RpcProtocol<RpcRequest> protocol, ServiceMeta serviceMetadata) throws Exception {
        if (serviceMetadata != null) {
            // 连接
            ChannelFuture future = bootstrap.connect(serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()).sync();
            future.addListener((ChannelFutureListener) arg0 -> {
                if (future.isSuccess()) {
                    logger.info("连接 rpc server {} 端口 {} 成功.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());
                } else {
                    logger.error("连接 rpc server {} 端口 {} 失败.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());
                    future.cause().printStackTrace();
                    eventLoopGroup.shutdownGracefully();
                }
            });
            // 写入数据
            future.channel().writeAndFlush(protocol);
        }
    }
}
服务提供方后置处理器
public class ProviderPostProcessor implements InitializingBean,BeanPostProcessor, EnvironmentAware {


    private Logger logger = LoggerFactory.getLogger(ProviderPostProcessor.class);

    RpcProperties rpcProperties;

    // 此处在linux环境下改为0.0.0.0
    private static String serverAddress = "127.0.0.1";

    private final Map<String, Object> rpcServiceMap = new HashMap<>();

    @Override
    public void afterPropertiesSet() throws Exception {

        Thread t = new Thread(() -> {
            try {
                startRpcServer();
            } catch (Exception e) {
                logger.error("start rpc server error.", e);
            }
        });
        t.setDaemon(true);
        t.start();
        SerializationFactory.init();
        RegistryFactory.init();
        LoadBalancerFactory.init();
        FilterConfig.initServiceFilter();
        ThreadPollFactory.setRpcServiceMap(rpcServiceMap);
    }

    private void startRpcServer() throws InterruptedException {
        int serverPort = rpcProperties.getPort();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline()
                                        .addLast(new RpcEncoder())
                                        .addLast(new RpcDecoder())
                                        .addLast(new ServiceBeforeFilterHandler())
                                        .addLast(new RpcRequestHandler())
                                        .addLast(new ServiceAfterFilterHandler());
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, serverPort).sync();
            logger.info("server addr {} started on port {}", this.serverAddress, serverPort);
            channelFuture.channel().closeFuture().sync();
            Runtime.getRuntime().addShutdownHook(new Thread(() ->
            {
                logger.info("ShutdownHook execute start...");
                logger.info("Netty NioEventLoopGroup shutdownGracefully...");
                logger.info("Netty NioEventLoopGroup shutdownGracefully2...");
                boss.shutdownGracefully();
                worker.shutdownGracefully();
                logger.info("ShutdownHook execute end...");
            }, "Allen-thread"));
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    /**
     * 服务注册
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Class<?> beanClass = bean.getClass();
        // 找到bean上带有 RpcService 注解的类
        RpcService rpcService = beanClass.getAnnotation(RpcService.class);
        if (rpcService != null) {
            // 可能会有多个接口,默认选择第一个接口
            String serviceName = beanClass.getInterfaces()[0].getName();
            if (!rpcService.serviceInterface().equals(void.class)){
                serviceName = rpcService.serviceInterface().getName();
            }
            String serviceVersion = rpcService.serviceVersion();
            try {
                // 服务注册
                int servicePort = rpcProperties.getPort();
                // 获取注册中心 ioc
                RegistryService registryService = RegistryFactory.get(rpcProperties.getRegisterType());
                ServiceMeta serviceMeta = new ServiceMeta();
                // 服务提供方地址
                serviceMeta.setServiceAddr("127.0.0.1");
                serviceMeta.setServicePort(servicePort);
                serviceMeta.setServiceVersion(serviceVersion);
                serviceMeta.setServiceName(serviceName);
                registryService.register(serviceMeta);
                // 缓存
                rpcServiceMap.put(RpcServiceNameBuilder.buildServiceKey(serviceMeta.getServiceName(),serviceMeta.getServiceVersion()), bean);
                logger.info("register server {} version {}",serviceName,serviceVersion);
            } catch (Exception e) {
                logger.error("failed to register service {}",  serviceVersion, e);
            }
        }
        return bean;
    }

    @Override
    public void setEnvironment(Environment environment) {
        RpcProperties properties = RpcProperties.getInstance();
        PropertiesUtils.init(properties,environment);
        rpcProperties = properties;
        logger.info("读取配置文件成功");
    }
}

rpc架构

代理层
                    object = Proxy.newProxyInstance(
                            aClass.getClassLoader(),
                            new Class<?>[]{aClass},
                            new RpcInvokerProxy(rpcReference.serviceVersion(),rpcReference.timeout(),rpcReference.faultTolerant(),
                                    rpcReference.loadBalancer(),rpcReference.retryCount()));
注册中心

redis,zk

public interface RegistryService {

    /**
     * 服务注册
     * @param serviceMeta
     * @throws Exception
     */
    void register(ServiceMeta serviceMeta) throws Exception;

    /**
     * 服务注销
     * @param serviceMeta
     * @throws Exception
     */
    void unRegister(ServiceMeta serviceMeta) throws Exception;


    /**
     * 获取 serviceName 下的所有服务
     * @param serviceName
     * @return
     */
    List<ServiceMeta> discoveries(String serviceName);
    /**
     * 关闭
     * @throws IOException
     */
    void destroy() throws IOException;

}
路由层

负载均衡(轮询/一致性哈希等)

router

public interface LoadBalancer<T> {

     /**
      * 选择负载均衡策略
      * @param params 入参,可自定义拿到入参后自行处理负载策略
      * @param serviceName 服务key
      * @return 当前服务节点以及其他节点,用于给容错使用
      */
     ServiceMetaRes select(Object[] params,String serviceName);

}
故障转移
            }catch (Throwable e){
                String errorMsg = e.toString();
                // todo 这里的容错机制可拓展,留作业自行更改
                switch (faultTolerantType){
                    // 快速失败
                    case FailFast:
                        log.warn("rpc 调用失败,触发 FailFast 策略,异常信息: {}",errorMsg);
                        return rpcResponse.getException();
                    // 故障转移
                    case Failover:
                        log.warn("rpc 调用失败,第{}次重试,异常信息:{}",count,errorMsg);
                        count++;
                        if (!ObjectUtils.isEmpty(otherServiceMeta)){
                            final ServiceMeta next = otherServiceMeta.iterator().next();
                            curServiceMeta = next;
                            otherServiceMeta.remove(next);
                        }else {
                            final String msg = String.format("rpc 调用失败,无服务可用 serviceName: {%s}, 异常信息: {%s}", serviceName, errorMsg);
                            log.warn(msg);
                            throw new RuntimeException(msg);
                        }
                        break;
                    // 忽视这次错误
                    case Failsafe:
                        return null;
                }
            }
协议层

编码,解码

                                socketChannel.pipeline()
                                        .addLast(new RpcEncoder())
                                        .addLast(new RpcDecoder())
拦截器层

拦截器链

public class FilterChain {


    private List<Filter> filters = new ArrayList<>();

    public void addFilter(Filter filter){
        filters.add(filter);
    }


    public void addFilter(List<Object> filters){
        for (Object filter : filters) {
            addFilter((Filter) filter);
        }
    }
    public void doFilter(FilterData data){
        for (Filter filter : filters) {
            filter.doFilter(data);
        }
    }
}

客户端拦截器

ClientAfterFilter,ClientBeforeFilter

        final FilterData filterData = new FilterData(request);
        try {
            FilterConfig.getClientBeforeFilterChain().doFilter(filterData);
        }catch (Throwable e){
            throw e;
        }
        protocol.setBody(request);

服务端连接器

ServiceAfterFilter,ServiceBeforeFilter

                                        .addLast(new ServiceBeforeFilterHandler())
                                        .addLast(new ServiceAfterFilterHandler());
**SPI **

org.xhystudy.rpc.spi.ExtensionLoader

    /**
     * 根据spi机制初加载bean的信息放入map
     * @param clazz
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public void loadExtension(Class clazz) throws IOException, ClassNotFoundException {
        if (clazz == null) {
            throw new IllegalArgumentException("class 没找到");
        }
        ClassLoader classLoader = this.getClass().getClassLoader();
        Map<String, Class> classMap = new HashMap<>();
        // 从系统SPI以及用户SPI中找bean
        for (String prefix : prefixs) {
            String spiFilePath = prefix + clazz.getName();
            Enumeration<URL> enumeration = classLoader.getResources(spiFilePath);
            while (enumeration.hasMoreElements()) {
                URL url = enumeration.nextElement();
                InputStreamReader inputStreamReader = null;
                inputStreamReader = new InputStreamReader(url.openStream());
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    String[] lineArr = line.split("=");
                    String key = lineArr[0];
                    String name = lineArr[1];
                    final Class<?> aClass = Class.forName(name);
                    extensionClassCache.put(key, aClass);
                    classMap.put(key, aClass);
                    logger.info("加载bean key:{} , value:{}",key,name);
                }
            }
        }
        extensionClassCaches.put(clazz.getName(),classMap);
    }
请求处理器
                                        .addLast(new RpcRequestHandler())
public static void submitRequest(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol){

        final RpcRequest request = protocol.getBody();
        String key = request.getClassName() + request.getMethodName() + request.getServiceVersion();
        ThreadPoolExecutor poll = fastPoll;
        if (slowTaskMap.containsKey(key) && slowTaskMap.get(key).intValue() >= 10){
            poll = slowPoll;
        }
        poll.submit(()->{
            RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
            final MsgHeader header = protocol.getHeader();
            RpcResponse response = new RpcResponse();
            long startTime = System.currentTimeMillis();

            try {
                final Object result = submit(ctx, protocol);
                response.setData(result);
                response.setDataClass(result == null ? null : result.getClass());
                header.setStatus((byte) MsgStatus.SUCCESS.ordinal());
            } catch (Exception e) {
                // 执行业务失败则将异常返回
                header.setStatus((byte) MsgStatus.FAILED.ordinal());
                response.setException(e);
                logger.error("process request {} error", header.getRequestId(), e);
            }finally {
                long cost = System.currentTimeMillis() - startTime;
                System.out.println("cost time:" + cost);
                if(cost > 1000){
                    final AtomicInteger timeOutCount = slowTaskMap.putIfAbsent(key, new AtomicInteger(1));
                    if (timeOutCount!=null){
                        timeOutCount.incrementAndGet();
                    }
                }
            }
            resProtocol.setHeader(header);
            resProtocol.setBody(response);
            logger.info("执行成功: {},{},{},{}",Thread.currentThread().getName(),request.getClassName(),request.getMethodName(),request.getServiceVersion());
            ctx.fireChannelRead(resProtocol);
        });
    }
Comment