伙伴云客服论坛»论坛 S区 S零代码 查看内容

0 评论

0 收藏

分享

Springboot 整合 阿里云消息队列RabbitMQ版效劳



因为公司的需要效劳都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开端对接阿里云的消息队列效劳。
准备

本着学习的前提,寻找是否免费的或者做活动的效劳,能白嫖的就白嫖,果然被我找到了。
    进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入
Springboot 整合 阿里云消息队列RabbitMQ版效劳-1.jpg

        2.进入页面搜索消息队列
Springboot 整合 阿里云消息队列RabbitMQ版效劳-2.jpg


        3.  详细队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云
        4. 原本Rocket版、Kafka版都想学习的,但只要rabbit版的免费,但也够意思了毕竟不要钱(虽然免费但后面还留了一个很大的坑等着踩呢
开端

        1. 创建一个springboot项目 命名为:rabbitmq-aliyun
        2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)
  1. server:
  2.   port: 8080
  3. aliyun:
  4.   rabbitmq:
  5.     accessKey: 密匙key
  6.     accessKeySecret: 密匙密码
  7.     username: 静态用户名
  8.     password:  静态密码
  9.     vHost: 虚拟机名称
  10.     exchange: 交换机名称
  11.     exType: 交换机类型
  12.     queue: 队列名称
  13.     BindingKey:  路由key
  14.     host: 介入点(公网接入点)
复制代码
:本地测试必需使用公网接入点  ,但是我们使用的免费rabbitMq效劳并没有公网接入点,只要VPC接入点
Springboot 整合 阿里云消息队列RabbitMQ版效劳-3.jpg


所以自己依照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有标明用哪一个接入点地址,进了这个大坑)
最后只能需求官方客户协助:
Springboot 整合 阿里云消息队列RabbitMQ版效劳-4.jpg


本着,不花钱的原则,但是使用VPC接入点 还得购置 阿里云ecs效劳,岂不是还得花更多的钱。
最后只能晋级效劳,并且选择支持公网
Springboot 整合 阿里云消息队列RabbitMQ版效劳-5.jpg


Springboot 整合 阿里云消息队列RabbitMQ版效劳-6.jpg


所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs效劳,要么升配队列效劳
3.创建配置数据映射对象 RabbitMqConfigDTO.class
  1. @Configuration
  2. @ConfigurationProperties("aliyun.rabbitmq")
  3. @Data
  4. public class RabbitMqConfigDTO {
  5.     /**
  6.      * 账户密匙key
  7.      */
  8.     private String accessKey;
  9.     /**
  10.      * 账户密匙
  11.      */
  12.     private String accessKeySecret;
  13.     /**
  14.      *  静态用户名
  15.      */
  16.     private String username;
  17.     /**
  18.      * 静态用户名密码
  19.      */
  20.     private String password;
  21.     /**
  22.      * 虚拟机名称
  23.      */
  24.     private String vHost;
  25.     /**
  26.      * 交换机名
  27.      */
  28.     private String exchange;
  29.     /**
  30.      * 交换机类型
  31.      */
  32.     private String exType;
  33.     /**
  34.      * 队列名
  35.      */
  36.     private String queue;
  37.     /**
  38.      * 绑定规则key
  39.      */
  40.     private String BindingKey;
  41.     /**
  42.      * 接入点地址
  43.      */
  44.     private String host;
  45. }
复制代码
       4. 创建spring工具类 SpringContextHolder.class 用于获取bean对象
  1. public class SpringContextHolder implements ApplicationContextAware {
  2.     @Autowired
  3.     private static ApplicationContext applicationContext;
  4.     public SpringContextHolder() {
  5.     }
  6.     @Override
  7.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  8.         SpringContextHolder.applicationContext = applicationContext;
  9.     }
  10.     public static ApplicationContext getApplicationContext() {
  11.         assertApplicationContext();
  12.         return applicationContext;
  13.     }
  14.     public static <T> T getBean(String beanName) {
  15.         assertApplicationContext();
  16.         return (T) applicationContext.getBean(beanName);
  17.     }
  18.     public static <T> T getBean(Class<T> requiredType) {
  19.         assertApplicationContext();
  20.         return applicationContext.getBean(requiredType);
  21.     }
  22.     private static void assertApplicationContext() {
  23.         if (applicationContext == null) {
  24.             throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
  25.         }
  26.     }
复制代码
       5. 创建rabbitMq工具类  RabbitMqUtil.class
  1. @Slf4j
  2. @Component
  3. public class RabbitMqUtil {
  4.    
  5.     @Autowired
  6.     private RabbitMqConfigDTO rabbitMqConfigDTO;
  7.     //第三步 建一个静态的本类
  8.     private static RabbitMqUtil rabbitMqUtil;
  9.     //第四步 初始化
  10.     @PostConstruct
  11.     public void init() {
  12.         rabbitMqUtil = this;
  13.     }
  14.    
  15.     /**
  16.      * 创建队列连接
  17.      * @return
  18.      */
  19.     public static Connection getRabbitConnection(){
  20.         ConnectionFactory factory = new ConnectionFactory();
  21.         //公网接入点
  22.         factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost());
  23.         //静态用户名
  24.         factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername());
  25.         //静态密码
  26.         factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword());
  27.         //自动恢复
  28.         factory.setAutomaticRecoveryEnabled(true);
  29.         //网络恢复时间
  30.         factory.setNetworkRecoveryInterval(5000);
  31.         //虚拟机名称
  32.         factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost());
  33.         //端口
  34.         factory.setPort(5672);
  35.         //连接超时时间
  36.         factory.setConnectionTimeout(30*100);
  37.         //设置握手超时时间
  38.         factory.setHandshakeTimeout(300000000);
  39.         factory.setShutdownTimeout(0);
  40.         //创建连接
  41.         Connection connection = null;
  42.         try {
  43.             connection =factory.newConnection();
  44.         }catch (Exception e){
  45.             log.error("rabbitMq连接异常", e);
  46.         }
  47.         return connection;
  48.     }
  49.     /**
  50.      * 创建队列通道
  51.      * @param connection
  52.      * @return
  53.      */
  54.     public static Channel getRabbitChannel(Connection connection){
  55.         Channel channel = null;
  56.         try {
  57.             channel = connection.createChannel();
  58.             String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange();
  59.             channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null);
  60.             channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
  61.             channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey());
  62.         }catch (Exception e){
  63.             log.error("创建rabbitMq通道异常", e);
  64.         }
  65.         return channel;
  66.     }
  67. }
复制代码
       6.创建server接口类
  1. public interface RabbitMqService {
  2.     /**
  3.      * 发送mq消息
  4.      * @return
  5.      */
  6.     String sendMessage() throws IOException, TimeoutException;
  7.     /**
  8.      * 消费消息
  9.      * @return
  10.      * @throws IOException
  11.      * @throws TimeoutException
  12.      */
  13.     String consumeMessage() throws IOException, TimeoutException;
  14. }
复制代码
       7.创建实现类
  1. @Service
  2. public class RabbitMqServiceImpl implements RabbitMqService {
  3.     @Autowired
  4.     private RabbitMqConfigDTO rabbitMqConfigDTO;
  5.     @Override
  6.     public String sendMessage() throws IOException {
  7.         Connection connection = RabbitMqUtil.getRabbitConnection();
  8.         Channel channel = RabbitMqUtil.getRabbitChannel(connection);
  9.         //开端发送消息
  10.         for(int i=0; i< 10 ; i++){
  11.             AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
  12.             channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props,
  13.                     ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
  14.         }
  15.         connection.close();
  16.         return "消息发送胜利";
  17.     }
  18.     @Override
  19.     public String consumeMessage() throws IOException, TimeoutException {
  20.         Connection connection = RabbitMqUtil.getRabbitConnection();
  21.         Channel channel = RabbitMqUtil.getRabbitChannel(connection);
  22.         String exchange = rabbitMqConfigDTO.getExchange();
  23.         channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null);
  24.         channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
  25.         channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey());
  26.         // 开端消费消息。
  27.         channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) {
  28.             @Override
  29.             public void handleDelivery(String consumerTag, Envelope envelope,
  30.                                        AMQP.BasicProperties properties, byte[] body)
  31.                     throws IOException {
  32.                 //接收到的消息,停止业务逻辑处置。
  33.                 System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
  34.                 channel.basicAck(envelope.getDeliveryTag(), false);
  35.             }
  36.         });
  37.         connection.close();
  38.         return "消费胜利";
  39.     }
  40.    
  41. }
复制代码
        8.创建控制层
  1. @RestController
  2. public class RabbitMqController {
  3.     @Autowired
  4.     private RabbitMqService rabbitMqService;
  5.     @GetMapping("/sendMessage")
  6.     public String sendMessage() throws IOException, TimeoutException {
  7.         return rabbitMqService.sendMessage();
  8.     }
  9.     @GetMapping("/consumeMessage")
  10.     public String consumeMessage() throws IOException, TimeoutException {
  11.         return rabbitMqService.consumeMessage();
  12.     }
  13. }
复制代码
       9.项目整体构造
Springboot 整合 阿里云消息队列RabbitMQ版效劳-7.jpg


     10.完成启动项目
     11.点击获取源码
测试

    发送消息
    Springboot 整合 阿里云消息队列RabbitMQ版效劳-8.jpg

     2. 进入控制台查看
Springboot 整合 阿里云消息队列RabbitMQ版效劳-9.jpg


         此时可以看到堆积10条消息,说明消息发送胜利了
        3. 消费消息
Springboot 整合 阿里云消息队列RabbitMQ版效劳-10.jpg


         4.再次进入控制台查看
Springboot 整合 阿里云消息队列RabbitMQ版效劳-11.jpg


                堆积的消息已变为0说明消息已经被全部消费了
后序

自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经处置方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所协助。

回复

举报 使用道具

相关帖子
全部回复
暂无回帖,快来参与回复吧
本版积分规则 高级模式
B Color Image Link Quote Code Smilies

列表唯1酷Girl
注册会员
主题 16
回复 22
粉丝 0
|网站地图
快速回复 返回顶部 返回列表