博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ源码解读系列——2、filtersrv源码
阅读量:4092 次
发布时间:2019-05-25

本文共 19184 字,大约阅读时间需要 63 分钟。

1. Broker 所在的机器会启劢多个 FilterServer 过滤进程

2. Consumer 启动之后后,会向 FilterServer 上传一个过滤的 Java 类
3. Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照
Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。
 

filtersrv的项目结构如下:

工程结构与namesrv主体基本一直,启动类是FileterStartup.java,核心控制类是FiltersrvController,启动服务时同样是调用的remoting工程中的NettyRemotingServer.java类,源码如下:

而处理各种请求消息的执行器是DefaultRequestProcessor.java

这些基本步骤与namesrv基本一致。

 

Startup中的主要方法源码:

System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));        // Socket发送缓冲区大小        if (null == System.getProperty(NettySystemConfig.SystemPropertySocketSndbufSize)) {            NettySystemConfig.SocketSndbufSize = 65535;        }        // Socket接收缓冲区大小        if (null == System.getProperty(NettySystemConfig.SystemPropertySocketRcvbufSize)) {            NettySystemConfig.SocketRcvbufSize = 1024;        }        try {            // 检测包冲突            PackageConflictDetect.detectFastjson();            // 解析命令行            Options options = ServerUtil.buildCommandlineOptions(new Options());            final CommandLine commandLine =                    ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),                        new PosixParser());            if (null == commandLine) {                System.exit(-1);                return null;            }            // 初始化配置文件            final FiltersrvConfig filtersrvConfig = new FiltersrvConfig();            final NettyServerConfig nettyServerConfig = new NettyServerConfig();            if (commandLine.hasOption('c')) {                String file = commandLine.getOptionValue('c');                if (file != null) {                    InputStream in = new BufferedInputStream(new FileInputStream(file));                    Properties properties = new Properties();                    properties.load(in);                    MixAll.properties2Object(properties, filtersrvConfig);                    System.out.println("load config properties file OK, " + file);                    in.close();                    String port = properties.getProperty("listenPort");                    if (port != null) {                        filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port));                    }                }            }            // 强制设置为0,自动分配端口号            nettyServerConfig.setListenPort(0);            nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue());            nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig                .getFsServerCallbackExecutorThreads());            nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads());            // 打印默认配置            if (commandLine.hasOption('p')) {                MixAll.printObjectProperties(null, filtersrvConfig);                MixAll.printObjectProperties(null, nettyServerConfig);                System.exit(0);            }            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);            if (null == filtersrvConfig.getRocketmqHome()) {                System.out.println("Please set the " + MixAll.ROCKETMQ_HOME_ENV                        + " variable in your environment to match the location of the RocketMQ installation");                System.exit(-2);            }            // 初始化Logback            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();            JoranConfigurator configurator = new JoranConfigurator();            configurator.setContext(lc);            lc.reset();            configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml");            log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName);            // 初始化服务控制对象            final FiltersrvController controller =                    new FiltersrvController(filtersrvConfig, nettyServerConfig);            boolean initResult = controller.initialize();            if (!initResult) {                controller.shutdown();                System.exit(-3);            }            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {                private volatile boolean hasShutdown = false;                private AtomicInteger shutdownTimes = new AtomicInteger(0);                @Override                public void run() {                    synchronized (this) {                        log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());                        if (!this.hasShutdown) {                            this.hasShutdown = true;                            long begineTime = System.currentTimeMillis();                            controller.shutdown();                            long consumingTimeTotal = System.currentTimeMillis() - begineTime;                            log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);                        }                    }                }            }, "ShutdownHook"));            return controller;        }        catch (Throwable e) {            e.printStackTrace();            System.exit(-1);        }        return null;

FiltersrvController中主要的属性和方法:

/**     *  Filter Server配置     */    private final FiltersrvConfig filtersrvConfig;    /**     *  通信层配置     */    private final NettyServerConfig nettyServerConfig;    /**     *  服务端通信层对象     */    private RemotingServer remotingServer;    /**     *  服务端网络请求处理线程池     */    private ExecutorService remotingExecutor;    /**     * 过滤器类的缓存类--在内存中保存了过滤器类     */    private final FilterClassManager filterClassManager;    /**     *  访问Broker的api封装     */    private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();    private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(        MixAll.FILTERSRV_CONSUMER_GROUP);    private volatile String brokerName = null;    // 定时线程    private final ScheduledExecutorService scheduledExecutorService = Executors        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));    /**     * FilterServer的运行快照管理---当前版本start方法的方法体为空     */    private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();    public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {        this.filtersrvConfig = filtersrvConfig;        this.nettyServerConfig = nettyServerConfig;        this.filterClassManager = new FilterClassManager(this);    }    public boolean initialize() {        // 打印服务器配置参数        MixAll.printObjectProperties(log, this.filtersrvConfig);        // 初始化通信层        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);        // 初始化线程池        this.remotingExecutor =                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),                    new ThreadFactoryImpl("RemotingExecutorThread_"));        this.registerProcessor();        // 定时向Broker注册自己        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                FiltersrvController.this.registerFilterServerToBroker();            }        }, 3, 10, TimeUnit.SECONDS);        // 初始化PullConsumer参数,要比默认参数小。        this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer            .getBrokerSuspendMaxTimeMillis() - 1000);        this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer            .getConsumerTimeoutMillisWhenSuspend() - 1000);        this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());        this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));        return true;    }    public String localAddr() {        return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(),            this.remotingServer.localListenPort());    }    /**     * 注册过滤服务到broker     *      * @author: yangcheng     */    public void registerFilterServerToBroker() {        try {            RegisterFilterServerResponseHeader responseHeader =                    this.filterServerOuterAPI.registerFilterServerToBroker(                        this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());            this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()                .setDefaultBrokerId(responseHeader.getBrokerId());            if (null == this.brokerName) {                this.brokerName = responseHeader.getBrokerName();            }            log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", //                this.localAddr(),//                this.filtersrvConfig.getConnectWhichBroker(),//                responseHeader.getBrokerName(),//                responseHeader.getBrokerId()//            );        }        catch (Exception e) {            log.warn("register filter server Exception", e);            // 如果失败,尝试自杀            log.warn("access broker failed, kill oneself");            System.exit(-1);        }    }    private void registerProcessor() {        this.remotingServer            .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);    }    public void start() throws Exception {        this.defaultMQPullConsumer.start();        this.remotingServer.start();        this.filterServerOuterAPI.start();        this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()            .setConnectBrokerByUser(true);        this.filterClassManager.start();        this.filterServerStatsManager.start();    }

DefaultRequestProcessor

/**     * 引入Controller     */    private final FiltersrvController filtersrvController;    public DefaultRequestProcessor(FiltersrvController filtersrvController) {        this.filtersrvController = filtersrvController;    }    @Override    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)            throws Exception {        if (log.isDebugEnabled()) {            log.debug("receive request, {} {} {}",//                request.getCode(), //                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //                request);        }        switch (request.getCode()) {        /**         * 向Filter Server注册Class---Consumer发起         */        case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:            return registerMessageFilterClass(ctx, request);        /**         * 订阅消息         */        case RequestCode.PULL_MESSAGE:            return pullMessageForward(ctx, request);        }        return null;    } /**     * 向consumer发送消息     * @param group     * @param topic     * @param ctx     * @param response     * @param msgList     * @author: yangcheng     */    private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx,            final RemotingCommand response, final List
msgList) { if (null != msgList) { ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; int bodyTotalSize = 0; for (int i = 0; i < msgList.size(); i++) { try { msgBufferList[i] = messageToByteBuffer(msgList.get(i)); bodyTotalSize += msgBufferList[i].capacity(); } catch (Exception e) { log.error("messageToByteBuffer UnsupportedEncodingException", e); } } ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); for (ByteBuffer bb : msgBufferList) { bb.flip(); body.put(bb); } response.setBody(body.array()); // 统计 this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); } try { ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause()); log.error(response.toString()); } } }); } catch (Throwable e) { log.error("FilterServer process request over, but response failed", e); log.error(response.toString()); } } private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); // 由于异步返回,所以必须要设置 response.setOpaque(request.getOpaque()); DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass( requestHeader.getConsumerGroup(), requestHeader.getTopic()); if (null == findFilterClass) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Find Filter class failed, not registered"); return response; } if (null == findFilterClass.getMessageFilter()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Find Filter class failed, registered but no class"); return response; } responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); // 构造从Broker拉消息的参数 MessageQueue mq = new MessageQueue(); mq.setTopic(requestHeader.getTopic()); mq.setQueueId(requestHeader.getQueueId()); mq.setBrokerName(this.filtersrvController.getBrokerName()); long offset = requestHeader.getQueueOffset(); int maxNums = requestHeader.getMaxMsgNums(); final PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { responseHeader.setMaxOffset(pullResult.getMaxOffset()); responseHeader.setMinOffset(pullResult.getMinOffset()); responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); response.setRemark(null); switch (pullResult.getPullStatus()) { case FOUND: response.setCode(ResponseCode.SUCCESS); List
msgListOK = new ArrayList
(); try { for (MessageExt msg : pullResult.getMsgFoundList()) { boolean match = findFilterClass.getMessageFilter().match(msg); if (match) { msgListOK.add(msg); } } // 有消息返回 if (!msgListOK.isEmpty()) { returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); return; } // 全部都被过滤掉了 else { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); } } // 只要抛异常,就终止过滤,并返回客户端异常 catch (Throwable e) { final String error = String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", requestHeader.getConsumerGroup(), requestHeader.getTopic()); log.error(error, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); return; } break; case NO_MATCHED_MSG: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break; case NO_NEW_MSG: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_ILLEGAL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); break; default: break; } returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); } @Override public void onException(Throwable e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); return; } }; pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); return null; } private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RegisterMessageFilterClassRequestHeader requestHeader = (RegisterMessageFilterClassRequestHeader) request .decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); try { boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass( requestHeader.getConsumerGroup(),// requestHeader.getTopic(),// requestHeader.getClassName(),// requestHeader.getClassCRC(), // request.getBody());// Body传输的是Java Source,必须UTF-8编码 if (!ok) { throw new Exception("registerFilterClass error"); } } catch (Exception e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(RemotingHelper.exceptionSimpleDesc(e)); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }

推荐一篇写的不错的关于filterSrv的博客,我阅读源码是也参考了这边文章:

转载地址:http://fpcii.baihongyu.com/

你可能感兴趣的文章
JavaScript专题之数组去重
查看>>
Immutable.js 以及在 react+redux 项目中的实践
查看>>
Vue2.0全家桶仿腾讯课堂(移动端)
查看>>
React+Redux系列教程
查看>>
react-native 自定义倒计时按钮
查看>>
19 个 JavaScript 常用的简写技术
查看>>
ES6这些就够了
查看>>
微信小程序:支付系列专辑(开发指南+精品Demo)
查看>>
iOS应用间相互跳转
查看>>
iOS开发之支付宝集成
查看>>
iOS开发 支付之银联支付集成
查看>>
iOS开发支付集成之微信支付
查看>>
浅谈JavaScript--声明提升
查看>>
React非嵌套组件通信
查看>>
Websocket 使用指南
查看>>
浏览器兼容性问题解决方案 · 总结
查看>>
一个很棒的Flutter学习资源列表
查看>>
为什么你应该放弃React老的Context API用新的Context API
查看>>
Flutter 布局控件完结篇
查看>>
Koa2初体验
查看>>