SpringBoot整合ActiveMq消息队列
导入依赖包
public class ActiveMQUtil { PooledConnectionFactory pooledConnectionFactory = null; public void init(String brokerUrl){ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl); pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); //设置超时时间 pooledConnectionFactory.setExpiryTimeout(2000); // 设置出现异常的时候,继续重试连接 pooledConnectionFactory.setReconnectOnException(true); // 设置最大连接数 pooledConnectionFactory.setMaxConnections(5); } // 获取连接 public Connection getConnection(){ Connection connection = null; try { connection = pooledConnectionFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } return connection; } }
写工具类
public class ActiveMQUtil { PooledConnectionFactory pooledConnectionFactory = null; public void init(String brokerUrl){ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl); pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); //设置超时时间 pooledConnectionFactory.setExpiryTimeout(2000); // 设置出现异常的时候,继续重试连接 pooledConnectionFactory.setReconnectOnException(true); // 设置最大连接数 pooledConnectionFactory.setMaxConnections(5); } // 获取连接 public Connection getConnection(){ Connection connection = null; try { connection = pooledConnectionFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } return connection; } }
写配置类,直接注解注入
@Configuration public class ActiveMQConfig { @Value("${spring.activemq.broker-url:disabled}") String brokerURL ; @Value("${activemq.listener.enable:disabled}") String listenerEnable; // 获取activeMQUtil @Bean public ActiveMQUtil getActiveMQUtil(){ if ("disabled".equals(brokerURL)){ return null; } ActiveMQUtil activeMQUtil = new ActiveMQUtil(); activeMQUtil.init(brokerURL); return activeMQUtil; } @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) { if("disabled".equals(listenerEnable)){ return null; } DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory); // 设置事务 factory.setSessionTransacted(false); // 手动签收 factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); // 设置并发数 factory.setConcurrency("5"); // 重连间隔时间 factory.setRecoveryInterval(5000L); return factory; } // 接收消息 @Bean public ActiveMQConnectionFactory activeMQConnectionFactory (){ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); return activeMQConnectionFactory; }
发消息的controller:实现发送一个success及orderid的message到PAYMENT_TO_ORDER队列里面,***取到message,拿到里面的success
@GetMapping("sendPayment") @ResponseBody public String sendPayment(String orderId){ paymentInfoService.sendPaymentToOrder(orderId,"success"); return "success"; }
发消息的service
@Override public void sendPaymentToOrder(String orderId,String result) { Connection connection = activeMQUtil.getConnection();//建立连接 try { Session session = connection.createSession(true, Session.SESSION_TRANSACTED);//创建会话 MessageProducer producer = session.createProducer(session.createQueue("PAYMENT_TO_ORDER"));//定义队列名称 MapMessage mapMessage = new ActiveMQMapMessage(); mapMessage.setString("orderId",orderId);//订单编号 mapMessage.setString("result",result);//结果 producer.send(mapMessage);//发送消息 session.commit();//提交会话 connection.close();//关闭连接 } catch (JMSException e) { e.printStackTrace(); } }
收消息的***
@JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener" ) public void consumeWareDeduct(MapMessage mapMessage) throws JMSException { // 更新订单状态 String orderId = mapMessage.getString("orderId"); String status = mapMessage.getString("status"); if("DEDUCTED".equals(status)){ orderService.updateStatus(orderId,ProcessStatus.WAITING_DELEVER); }else{ orderService.updateStatus(orderId,ProcessStatus.STOCK_EXCEPTION); } }