这里给大家演示个生产消息、订阅消费消息的常规示例代码!
当然rocketmq 强大的功能和多种模式 不仅仅局限于此,
大家可以研究其官方代码示例 或 去我的github 上看我在工作中的实战代码示例哦!
一: 生产者
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name_quickstart");
producer.setNamesrvAddr("172.30.248.219:9876");
producer.setInstanceName("QuickStartProducer");
producer.start();
for (int i = 0; i < 100; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ By Dy" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
二: 消费者:
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name_quickstart");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("172.30.248.219:9876");
consumer.setInstanceName("QuickStartConsumer");
consumer.subscribe("TopicTest", "TagA3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (Message msg :msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}