本文共 19184 字,大约阅读时间需要 63 分钟。
1. Broker 所在的机器会启劢多个 FilterServer 过滤进程
2. Consumer 启动之后后,会向 FilterServer 上传一个过滤的 Java 类 3. Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。filtersrv的项目结构如下:
工程结构与namesrv主体基本一直,启动类是FileterStartup.java,核心控制类是FiltersrvController,启动服务时同样是调用的remoting工程中的NettyRemotingServer.java类,源码如下:
而处理各种请求消息的执行器是DefaultRequestProcessor.java
这些基本步骤与namesrv基本一致。
Startup中的主要方法源码:
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion)); // Socket发送缓冲区大小 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketSndbufSize)) { NettySystemConfig.SocketSndbufSize = 65535; } // Socket接收缓冲区大小 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketRcvbufSize)) { NettySystemConfig.SocketRcvbufSize = 1024; } try { // 检测包冲突 PackageConflictDetect.detectFastjson(); // 解析命令行 Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // 初始化配置文件 final FiltersrvConfig filtersrvConfig = new FiltersrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); Properties properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, filtersrvConfig); System.out.println("load config properties file OK, " + file); in.close(); String port = properties.getProperty("listenPort"); if (port != null) { filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port)); } } } // 强制设置为0,自动分配端口号 nettyServerConfig.setListenPort(0); nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig .getFsServerCallbackExecutorThreads()); nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); // 打印默认配置 if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, filtersrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); if (null == filtersrvConfig.getRocketmqHome()) { System.out.println("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation"); System.exit(-2); } // 初始化Logback LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml"); log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName); // 初始化服务控制对象 final FiltersrvController controller = new FiltersrvController(filtersrvConfig, nettyServerConfig); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long begineTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - begineTime; log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null;
FiltersrvController中主要的属性和方法:
/** * Filter Server配置 */ private final FiltersrvConfig filtersrvConfig; /** * 通信层配置 */ private final NettyServerConfig nettyServerConfig; /** * 服务端通信层对象 */ private RemotingServer remotingServer; /** * 服务端网络请求处理线程池 */ private ExecutorService remotingExecutor; /** * 过滤器类的缓存类--在内存中保存了过滤器类 */ private final FilterClassManager filterClassManager; /** * 访问Broker的api封装 */ private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( MixAll.FILTERSRV_CONSUMER_GROUP); private volatile String brokerName = null; // 定时线程 private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); /** * FilterServer的运行快照管理---当前版本start方法的方法体为空 */ private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { this.filtersrvConfig = filtersrvConfig; this.nettyServerConfig = nettyServerConfig; this.filterClassManager = new FilterClassManager(this); } public boolean initialize() { // 打印服务器配置参数 MixAll.printObjectProperties(log, this.filtersrvConfig); // 初始化通信层 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); // 初始化线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); // 定时向Broker注册自己 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { FiltersrvController.this.registerFilterServerToBroker(); } }, 3, 10, TimeUnit.SECONDS); // 初始化PullConsumer参数,要比默认参数小。 this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer .getBrokerSuspendMaxTimeMillis() - 1000); this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer .getConsumerTimeoutMillisWhenSuspend() - 1000); this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); return true; } public String localAddr() { return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), this.remotingServer.localListenPort()); } /** * 注册过滤服务到broker * * @author: yangcheng */ public void registerFilterServerToBroker() { try { RegisterFilterServerResponseHeader responseHeader = this.filterServerOuterAPI.registerFilterServerToBroker( this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() .setDefaultBrokerId(responseHeader.getBrokerId()); if (null == this.brokerName) { this.brokerName = responseHeader.getBrokerName(); } log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", // this.localAddr(),// this.filtersrvConfig.getConnectWhichBroker(),// responseHeader.getBrokerName(),// responseHeader.getBrokerId()// ); } catch (Exception e) { log.warn("register filter server Exception", e); // 如果失败,尝试自杀 log.warn("access broker failed, kill oneself"); System.exit(-1); } } private void registerProcessor() { this.remotingServer .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } public void start() throws Exception { this.defaultMQPullConsumer.start(); this.remotingServer.start(); this.filterServerOuterAPI.start(); this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() .setConnectBrokerByUser(true); this.filterClassManager.start(); this.filterServerStatsManager.start(); }
DefaultRequestProcessor
/** * 引入Controller */ private final FiltersrvController filtersrvController; public DefaultRequestProcessor(FiltersrvController filtersrvController) { this.filtersrvController = filtersrvController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { if (log.isDebugEnabled()) { log.debug("receive request, {} {} {}",// request.getCode(), // RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // request); } switch (request.getCode()) { /** * 向Filter Server注册Class---Consumer发起 */ case RequestCode.REGISTER_MESSAGE_FILTER_CLASS: return registerMessageFilterClass(ctx, request); /** * 订阅消息 */ case RequestCode.PULL_MESSAGE: return pullMessageForward(ctx, request); } return null; } /** * 向consumer发送消息 * @param group * @param topic * @param ctx * @param response * @param msgList * @author: yangcheng */ private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response, final ListmsgList) { if (null != msgList) { ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; int bodyTotalSize = 0; for (int i = 0; i < msgList.size(); i++) { try { msgBufferList[i] = messageToByteBuffer(msgList.get(i)); bodyTotalSize += msgBufferList[i].capacity(); } catch (Exception e) { log.error("messageToByteBuffer UnsupportedEncodingException", e); } } ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); for (ByteBuffer bb : msgBufferList) { bb.flip(); body.put(bb); } response.setBody(body.array()); // 统计 this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); } try { ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause()); log.error(response.toString()); } } }); } catch (Throwable e) { log.error("FilterServer process request over, but response failed", e); log.error(response.toString()); } } private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); // 由于异步返回,所以必须要设置 response.setOpaque(request.getOpaque()); DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass( requestHeader.getConsumerGroup(), requestHeader.getTopic()); if (null == findFilterClass) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Find Filter class failed, not registered"); return response; } if (null == findFilterClass.getMessageFilter()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Find Filter class failed, registered but no class"); return response; } responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); // 构造从Broker拉消息的参数 MessageQueue mq = new MessageQueue(); mq.setTopic(requestHeader.getTopic()); mq.setQueueId(requestHeader.getQueueId()); mq.setBrokerName(this.filtersrvController.getBrokerName()); long offset = requestHeader.getQueueOffset(); int maxNums = requestHeader.getMaxMsgNums(); final PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { responseHeader.setMaxOffset(pullResult.getMaxOffset()); responseHeader.setMinOffset(pullResult.getMinOffset()); responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); response.setRemark(null); switch (pullResult.getPullStatus()) { case FOUND: response.setCode(ResponseCode.SUCCESS); List msgListOK = new ArrayList (); try { for (MessageExt msg : pullResult.getMsgFoundList()) { boolean match = findFilterClass.getMessageFilter().match(msg); if (match) { msgListOK.add(msg); } } // 有消息返回 if (!msgListOK.isEmpty()) { returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); return; } // 全部都被过滤掉了 else { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); } } // 只要抛异常,就终止过滤,并返回客户端异常 catch (Throwable e) { final String error = String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", requestHeader.getConsumerGroup(), requestHeader.getTopic()); log.error(error, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); return; } break; case NO_MATCHED_MSG: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break; case NO_NEW_MSG: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_ILLEGAL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); break; default: break; } returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); } @Override public void onException(Throwable e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); return; } }; pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); return null; } private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RegisterMessageFilterClassRequestHeader requestHeader = (RegisterMessageFilterClassRequestHeader) request .decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); try { boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass( requestHeader.getConsumerGroup(),// requestHeader.getTopic(),// requestHeader.getClassName(),// requestHeader.getClassCRC(), // request.getBody());// Body传输的是Java Source,必须UTF-8编码 if (!ok) { throw new Exception("registerFilterClass error"); } } catch (Exception e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(RemotingHelper.exceptionSimpleDesc(e)); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
推荐一篇写的不错的关于filterSrv的博客,我阅读源码是也参考了这边文章:
转载地址:http://fpcii.baihongyu.com/