ActiveMQ的生产者和消费者的简单例子

作者: Arvin Chen 分类: Java 来源: Break易站(www.breakyizhan.com)
  •   Java 消息队列

    ActiveMQ的安装和启动已经说明了如何安装和启动AcitveMQ, Java 消息队列 讲消息队列就不得不提JMS 。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。实现ActiveMQ的步骤如下:

    1. 启动ActiveMQ
    2. Producer发送数据到ActiveMQ
    3. Consumer接收ActiveMQ的消息

    启动ActiveMQ在ActiveMQ的安装和启动已经说明了。

    Producer.java运行后如下(ActiveMQ多出来10个待消费的消息):

    ActiveMQ的生产者和消费者的简单例子

    Consumer.java运行后如下(ActiveMQ显示有一个消费者订阅,并且已经消费了10个消息):

    ActiveMQ的生产者和消费者的简单例子

     

    Producer.java如下:

    package com.breakyizhan.activemq.simpletest;
    
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    /**
     * 
     * @Title 消息生产者
     * @Description 消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地
     * @author Breakyizhan
     * @date 2019-06-05
     */
    public class Producer {
    	private static final int SEND_NUMBER = 10;
     
    	public static void main(String[] args) {
    		// ActiveMQConnectionFactory连接工厂 客户用来创建连接的对象
    		ConnectionFactory connectionFactory;
    		// JMS Connection 封装了客户与 JMS 提供者之间的一个虚拟的连接。
    		Connection connection = null;
    		// JMS Session 是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。
    		// 会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
    		Session session;
    		// 目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。
    		Destination destination;
    		// 【消息生产者】
    		MessageProducer producer;
    		// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
    		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
    				"tcp://localhost:61616");
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			// 获取操作连接,Session.AUTO_ACKNOWLEDGE。当客户成功的从 receive 方法返回的时候,
    			// 或者从 MessageListener.onMessage 方法成功返回的时候,会话自动确认客户收到的消息。
    			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    			// 获取session注意参数值FirstQueue是一个服务器的queue,须在在ActiveMq的console配置
    			destination = session.createQueue("FirstQueue");
    			producer = session.createProducer(destination);
    			// NON_PERSISTENT。不要求 JMS provider 持久保存消息。
    			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    			sendMessage(session, producer);
    			session.commit();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				if (null != connection)
    					connection.close();
    			} catch (Throwable ignore) {
    			}
    		}
    	}
     
    	public static void sendMessage(Session session, MessageProducer producer) throws Exception {
    		for (int i = 1; i <= SEND_NUMBER; i++) {
    			TextMessage message = session.createTextMessage("ActiveMq 生产者的消息" + i);
    			System.out.println("ActiveMq [生产者]消息:" + i);
    			producer.send(message);
    		}
    	}
    }
    

    Consumer.java如下:

    package com.breakyizhan.activemq.simpletest;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    /**
     * @Title 消息消费者
     * @Description 消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。
     * @author Breakyizhan
     * @date 2019-06-05
     */
    public class Consumer {
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		// 【消息消费者】
    		MessageConsumer consumer;
    		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
    				"tcp://localhost:61616");
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    			// 获取session注意参数值FirstQueue是一个服务器的queue,须在在ActiveMq的console配置
    			destination = session.createQueue("FirstQueue");
    			consumer = session.createConsumer(destination);
    			while (true) {
    				// 消息的消费可以采用以下两种方法之一:
    				// • 同步消费。通过调用消费者的 receive 方法从目的地中显式提取消息。receive 方法可以一直阻塞到消息到达。
    				// • 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作
    				TextMessage message = (TextMessage) consumer.receive(500000);
    				if (null != message) {
    					System.out.println("[消费者]消息:" + message.getText());
    				} else {
    					break;
    				}
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				if (null != connection)
    					connection.close();
    			} catch (Throwable ignore) {
    			}
    		}
    	}
    
    }
  •   Java 消息队列
  •   本文标题:ActiveMQ的生产者和消费者的简单例子 - Break易站
    转载请保留页面地址:https://www.breakyizhan.com/java/15233.html
      微信返利机器人
      免费:淘宝,京东,拼多多优惠券
      腾讯,爱奇艺,优酷的VIP视频免费解析,免费看
      即刻扫描二维码,添加微信机器人!

    发表笔记

    电子邮件地址不会被公开。 必填项已用*标注