ActiveMQ

/ 工具和中间件 / 2 条评论 / 1197浏览

ActiveMQ

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线,完全支持JMS1.1和J2EE 1.4规范,java语言开发的。
http://activemq.apache.org/

activemq的两种模式

启动与安装

启动:[root@localhost bin]# ./activemq start
关闭:[root@localhost bin]# ./activemq stop
查看状态:[root@localhost bin]# ./activemq status

一个例子Queues

消费者和生产者谁先启动都没有关系,因为消息默认就是持久化的 都可以收到

  <dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.9</version>
    </dependency> 
      <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>4.3.1</version>
    </dependency>
  </dependencies>
public class TestProducer {
    //服务地址,端口默认61616
    private static final String url="tcp://127.0.0.1:61616";
    //这次发送的消息名称
    private static final String topicName="imwj";
    public static void main(String[] args) throws JMSException {
        //0. 先判断端口是否启动了  Active MQ 服务器
        ActiveMQUtil.checkServer();
        //1.创建ConnectiongFactory,绑定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.创建Connection
        Connection connection= factory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目标 (队列类型)
        Destination destination=session.createQueue(topicName);
        //6.创建一个生产者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            //7.创建消息
            TextMessage textMessage=session.createTextMessage("队列消息-"+i);
            //8.发送消息
            producer.send(textMessage);
            System.out.println("发送:"+textMessage.getText());
        }
        //7. 关闭连接
        connection.close();
    }
}
public class TestConsumer {
    //服务器地址
    private static final String url = "tcp://127.0.0.1:61616";
    //此次消费者名称
    private static final String topicName = "imwj";
    //消费者是多个,为了区分不同的消费者 随机命名
    private static final String consumerName = "consumer-" + RandomUtil.randomString(5);
    public static void main(String[] args) throws JMSException {
        //0.先判断端口是否启动
        ActiveMQUtil.checkServer();
        System.out.printf("%s 消费者启动了。 %n", consumerName);
        //1.创建ConnectionFactory绑定地址
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //2.创建Connection
        Connection connection = factory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目标(主题类型-点对点queue)
        Destination destination = session.createQueue(topicName);
        //6.创建一个消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message arg0) {
                TextMessage textMessage=(TextMessage)arg0;
                try {
                    System.out.println(consumerName +" 接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8. 因为不知道什么时候有,所以没法主动关闭,就不关闭了,一直处于监听状态
        //connection.close();
    }
}

一个例子Topics

只需要修改一下session.createQueue(topicName)
必须是消费者先启动,然后监听生产者所发的消息 否则无法收到(因为消息非持久化)

创建一个目标(主题类型-点对点queue)
Destination destination = session.createQueue(topicName);
//修改成:
创建一个目标(主题类型-发布/订阅Topics)
Destination destination=session.createTopic(topicName);