这里给大家演示个生产消息、订阅消费消息的常规示例代码!
当然rocketmq 强大的功能和多种模式 不仅仅局限于此,
大家可以研究其官方代码示例 或 去我的github 上看我在工作中的实战代码示例哦!
一: 生产者
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name_quickstart");
producer.setNamesrvAddr("172.30.248.219:9876");
producer.setInstanceName("QuickStartProducer");
producer.start();
for (int i = 0; i < 100; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ By Dy" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
二: 消费者:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name_quickstart"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("172.30.248.219:9876"); consumer.setInstanceName("QuickStartConsumer"); consumer.subscribe("TopicTest", "TagA3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); for (Message msg :msgs){ System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }