RabbitMQ

/ 工具和中间件 / 2 条评论 / 1232浏览

RabbitMQ

RabbitMQ是一个由erlang语言开发的AMQP(Advanved Message Queue Protocol)的开源实现。

安装

"D:\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management

运行机制

AMQP中消息的路由过程和Java开发者熟悉的JMS存在一些差别,AMQP中增加了Exchange和Binding的角色。生产者把消息发布到Exchange 上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到那个队列。

RabbitMQ

Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
 
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
 
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别
 
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走
 
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
 
Connection
网络连接,比如一个TCP连接。
 
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
 
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

RabbitMQ的四种模式

direct点对点

  <description>rabbitmq</description>
   <dependencies>
   	<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
	 </dependency>
	<dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>4.3.1</version>
    </dependency>	
   </dependencies>
public class TestProducer {
    public static final String EXCHANGE_NAME =  "direct_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtil.checkServer();

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置RabbitMQ相关信息
        factory.setHost("127.0.0.1");
        //3.创建一个新的连接
        Connection connection = factory.newConnection();
        //4.创建一个通道
        Channel channel = connection.createChannel();

        //5.发送消息到队列中
        for(int i=0 ; i<100; i++){
            String meg = "fanount 消息" + i;
            channel.basicPublish("", EXCHANGE_NAME, null, meg.getBytes("UTF-8"));
            System.out.println("发送消息:"+meg);
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
public class TestCustomer {
    public final static String EXCHANGE_NAME="direct_queue";
    //为当前消费者取随机名
    public static final String name = "consumer-"+ RandomUtil.randomString(5);

    public static void main(String[] args) throws IOException, TimeoutException {
        //0.判断服务是否启动
        RabbitMQUtil.checkServer();
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置RabbitMQ相关信息
        factory.setHost("127.0.0.1");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建通道
        Channel channel = connection.createChannel();
        //声明要关注的队列
        channel.queueDeclare(EXCHANGE_NAME, false, false, true, null);

        System.out.println(name + " 等到接收消息");
        //DefaultConsumer类实现了Consumer接口,通过传入下一个频道
        //告诉服务器我们需要那个频道的消息 如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(EXCHANGE_NAME, true, consumer);
    }
}

fanout发布/订阅

必须要先启动消费者监听消息,然后再启动生产者 否则收不到消息

public class TestProducer {
    public static final String EXCHANGE_NAME =  "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtil.checkServer();

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置RabbitMQ相关信息
        factory.setHost("127.0.0.1");
        //3.创建一个新的连接
        Connection connection = factory.newConnection();
        //4.创建一个通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //5.发送消息到队列中
        for(int i=0 ; i<100; i++){
            String meg = "fanount 消息" + i;
            channel.basicPublish(EXCHANGE_NAME, "", null, meg.getBytes("UTF-8"));
            System.out.println("发送消息:"+meg);
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
public class TestCustomer {
    public final static String EXCHANGE_NAME="fanout_exchange";
    //为当前消费者取随机名
    public static final String name = "consumer-"+ RandomUtil.randomString(5);

    public static void main(String[] args) throws IOException, TimeoutException {
       //0.判断服务是否启动
        RabbitMQUtil.checkServer();
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置RabbitMQ相关信息
        factory.setHost("127.0.0.1");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //获取一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
        channel.queueBind(queue, EXCHANGE_NAME, "");

        System.out.println(name + " 等到接收消息");
        //DefaultConsumer类实现了Consumer接口,通过传入下一个频道
        //告诉服务器我们需要那个频道的消息 如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(queue, true, consumer);
    }

}

消息的路由键匹配topic

必须要先启动消费者监听消息,然后再启动生产者 否则收不到消息

/**
 * @author langao_q
 * @create 2020-03-24 14:53
 * 四个路由:"usa.news", "usa.weather", "europe.news", "europe.weather"
 */
public class TestProducer {
    public static final String EXCHANGE_NAME =  "topic_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtil.checkServer();
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.配置RabbitMQ相关信息
        factory.setHost("127.0.0.1");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //5.发送消息
        String[] routing_keys = new String[] { "usa.news", "usa.weather","europe.news", "europe.weather" };
        String[] messages = new String[] { "美国新闻", "美国天气","欧洲新闻", "欧洲天气" };
        for(int i=0; i< routing_keys.length; i++){
            String routingKey = routing_keys[i];
            String message = messages[i];
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.printf("发送消息到路由:%s, 内容是: %s%n ", routingKey,message);
        }
        //6.关闭通道和连接
        channel.close();
        connection.close();
    }
}
public class TestCustomer1 {
    public final static String EXCHANGE_NAME="topic_queue";
    //为当前消费者取随机名
    public static final String name = "consumer-"+ RandomUtil.randomString(5);

    public static void main(String[] args) throws IOException, TimeoutException {
       //0.判断服务是否启动
        RabbitMQUtil.checkServer();
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.配置RabbitMQ信息
        factory.setHost("127.0.0.1");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //5.获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //6.接收usa信息
        channel.queueBind(queueName, EXCHANGE_NAME, "usa.*");
        System.out.println(name +" 等待接受消息");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(queueName, true, consumer);
    }
}
public class TestCustomer1 {
    public final static String EXCHANGE_NAME="topic_queue";
    //为当前消费者取随机名
    public static final String name = "consumer-"+ RandomUtil.randomString(5);

    public static void main(String[] args) throws IOException, TimeoutException {
       //0.判断服务是否启动
        RabbitMQUtil.checkServer();
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.配置RabbitMQ信息
        factory.setHost("127.0.0.1");
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //5.获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //6.接收usa信息
        channel.queueBind(queueName, EXCHANGE_NAME, "*.news");
        System.out.println(name +" 等待接受消息");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(queueName, true, consumer);
    }
}

springboot中应用

AmqpAdmin:rabbitmq管理器(交换机和队列规则)
RabbitTemplate:rabbitmq的操作模板

rabitmq中交换机和队列创建

@SpringBootTest
class RabbitmqApplicationTests {
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    void contextLoads() {
        //1.创建交换机(DirectExchange、FanoutExchange、TopicExchange)
        amqpAdmin.declareExchange(new DirectExchange("imwj.exchange"));
        //2.创建队列(队列名称、是否持久化)
        amqpAdmin.declareQueue(new Queue("imwj.queue", true));
        //3.创建绑定规则(队列名称、绑定类型QUEUE/EXCHANGE、交换机名称、路由键、参数)
        amqpAdmin.declareBinding(new Binding("imwj.queue", Binding.DestinationType.QUEUE,"imwj.exchange", "usa.news", null));
    }
}

发送消息

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送消息
    @Test
    void fun1() {
        rabbitTemplate.convertAndSend("imwj.exchange", "usa.news", "aa");
    }
}

监听并接收消息

@RabbitListener注解,其中queues是数组类型的
注意:启动类要开启@EnableRabbit注解

@Service
public class RabbitService {

    @RabbitListener(queues = "imwj.queue")
    public void receive(String str){
        System.out.println("收到消息:" + str);
    }
}