2019-06-20 13:39:28 3979瀏覽
今天千鋒扣丁學(xué)堂Java培訓(xùn)老師給大家分享一篇關(guān)于RocketMQ快速入門基礎(chǔ)知識的詳細(xì)介紹,首先RocketMQ是站在巨人的肩膀上(kafka),又對其進(jìn)行了優(yōu)化讓其更滿足互聯(lián)網(wǎng)公司的特點(diǎn)。它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency>
DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的 /** * Producer對象在使用之前必須要調(diào)用start初始化,初始化一次即可 * 注意:切記不可以在每次發(fā)送消息時(shí),都調(diào)用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //構(gòu)建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //發(fā)送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown();
/** * Consumer Group,非常重要的概念,后續(xù)會(huì)慢慢補(bǔ)充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多個(gè)地址以 ; 隔開 consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的 /** * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) * 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for(MessageExt msg:msgs){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功 } }); consumer.start(); System.out.printf("Consumer Started.%n");
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
java -jar rocketmq-console-ng-1.0.0.jar
一個(gè)好的習(xí)慣是先運(yùn)行Consumer,之后在運(yùn)行Producer,之后通過rocketmq-console-ng管控臺(tái)觀察
運(yùn)行完成之后,的確broker-a的數(shù)據(jù)加上broker-b的數(shù)據(jù)量就等于我們發(fā)送的數(shù)據(jù)量,而且slave的數(shù)量也master的數(shù)量也是一致的,效果如下:
查看發(fā)送這些數(shù)據(jù),2臺(tái)機(jī)器的磁盤情況如下:
【關(guān)注微信公眾號獲取更多學(xué)習(xí)資料】 【掃碼進(jìn)入JavaEE/微服務(wù)VIP免費(fèi)公開課】
查看更多關(guān)于“Java開發(fā)資訊”的相關(guān)文章>>