DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constant.operationLogGroup); try { consumer.setNamesrvAddr(Constant.rocketQueneAddr); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe(Constant.operationLogTopic, Constant.operationLogTag); } catch (MQClientException e) { logger.error("consume operation log MQ error", e); } cometutil = Comet4jUtil.getInstance(CHANNEL); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { byte[] bytes = msgs.get(0).getBody(); try { cometutil.sendMesToAllConnsWithString(CHANNEL, new String(bytes, "UTF-8")); } catch (UnsupportedEncodingException e) { } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); try { consumer.start(); logger.info("operationLogController's MQ consumer started."); } catch (MQClientException e) { logger.error("consume operation log MQ start error", e); }