序
因为公司的需要效劳都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开端对接阿里云的消息队列效劳。
准备
本着学习的前提,寻找是否免费的或者做活动的效劳,能白嫖的就白嫖,果然被我找到了。
进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入
2.进入页面搜索消息队列
3. 详细队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云
4. 原本Rocket版、Kafka版都想学习的,但只要rabbit版的免费,但也够意思了毕竟不要钱(注:虽然免费但后面还留了一个很大的坑等着踩呢)
开端
1. 创建一个springboot项目 命名为:rabbitmq-aliyun
2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)- server:
- port: 8080
- aliyun:
- rabbitmq:
- accessKey: 密匙key
- accessKeySecret: 密匙密码
- username: 静态用户名
- password: 静态密码
- vHost: 虚拟机名称
- exchange: 交换机名称
- exType: 交换机类型
- queue: 队列名称
- BindingKey: 路由key
- host: 介入点(公网接入点)
复制代码注:本地测试必需使用公网接入点 ,但是我们使用的免费rabbitMq效劳并没有公网接入点,只要VPC接入点
所以自己依照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有标明用哪一个接入点地址,进了这个大坑)
最后只能需求官方客户协助:
本着,不花钱的原则,但是使用VPC接入点 还得购置 阿里云ecs效劳,岂不是还得花更多的钱。
最后只能晋级效劳,并且选择支持公网
所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs效劳,要么升配队列效劳
3.创建配置数据映射对象 RabbitMqConfigDTO.class- @Configuration
- @ConfigurationProperties("aliyun.rabbitmq")
- @Data
- public class RabbitMqConfigDTO {
- /**
- * 账户密匙key
- */
- private String accessKey;
- /**
- * 账户密匙
- */
- private String accessKeySecret;
- /**
- * 静态用户名
- */
- private String username;
- /**
- * 静态用户名密码
- */
- private String password;
- /**
- * 虚拟机名称
- */
- private String vHost;
- /**
- * 交换机名
- */
- private String exchange;
- /**
- * 交换机类型
- */
- private String exType;
- /**
- * 队列名
- */
- private String queue;
- /**
- * 绑定规则key
- */
- private String BindingKey;
- /**
- * 接入点地址
- */
- private String host;
- }
复制代码 4. 创建spring工具类 SpringContextHolder.class 用于获取bean对象- public class SpringContextHolder implements ApplicationContextAware {
- @Autowired
- private static ApplicationContext applicationContext;
- public SpringContextHolder() {
- }
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- SpringContextHolder.applicationContext = applicationContext;
- }
- public static ApplicationContext getApplicationContext() {
- assertApplicationContext();
- return applicationContext;
- }
- public static <T> T getBean(String beanName) {
- assertApplicationContext();
- return (T) applicationContext.getBean(beanName);
- }
- public static <T> T getBean(Class<T> requiredType) {
- assertApplicationContext();
- return applicationContext.getBean(requiredType);
- }
- private static void assertApplicationContext() {
- if (applicationContext == null) {
- throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
- }
- }
复制代码 5. 创建rabbitMq工具类 RabbitMqUtil.class- @Slf4j
- @Component
- public class RabbitMqUtil {
-
- @Autowired
- private RabbitMqConfigDTO rabbitMqConfigDTO;
- //第三步 建一个静态的本类
- private static RabbitMqUtil rabbitMqUtil;
- //第四步 初始化
- @PostConstruct
- public void init() {
- rabbitMqUtil = this;
- }
-
- /**
- * 创建队列连接
- * @return
- */
- public static Connection getRabbitConnection(){
- ConnectionFactory factory = new ConnectionFactory();
- //公网接入点
- factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost());
- //静态用户名
- factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername());
- //静态密码
- factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword());
- //自动恢复
- factory.setAutomaticRecoveryEnabled(true);
- //网络恢复时间
- factory.setNetworkRecoveryInterval(5000);
- //虚拟机名称
- factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost());
- //端口
- factory.setPort(5672);
- //连接超时时间
- factory.setConnectionTimeout(30*100);
- //设置握手超时时间
- factory.setHandshakeTimeout(300000000);
- factory.setShutdownTimeout(0);
- //创建连接
- Connection connection = null;
- try {
- connection =factory.newConnection();
- }catch (Exception e){
- log.error("rabbitMq连接异常", e);
- }
- return connection;
- }
- /**
- * 创建队列通道
- * @param connection
- * @return
- */
- public static Channel getRabbitChannel(Connection connection){
- Channel channel = null;
- try {
- channel = connection.createChannel();
- String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange();
- channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null);
- channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
- channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey());
- }catch (Exception e){
- log.error("创建rabbitMq通道异常", e);
- }
- return channel;
- }
- }
复制代码 6.创建server接口类- public interface RabbitMqService {
- /**
- * 发送mq消息
- * @return
- */
- String sendMessage() throws IOException, TimeoutException;
- /**
- * 消费消息
- * @return
- * @throws IOException
- * @throws TimeoutException
- */
- String consumeMessage() throws IOException, TimeoutException;
- }
复制代码 7.创建实现类- @Service
- public class RabbitMqServiceImpl implements RabbitMqService {
- @Autowired
- private RabbitMqConfigDTO rabbitMqConfigDTO;
- @Override
- public String sendMessage() throws IOException {
- Connection connection = RabbitMqUtil.getRabbitConnection();
- Channel channel = RabbitMqUtil.getRabbitChannel(connection);
- //开端发送消息
- for(int i=0; i< 10 ; i++){
- AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
- channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props,
- ("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));
- }
- connection.close();
- return "消息发送胜利";
- }
- @Override
- public String consumeMessage() throws IOException, TimeoutException {
- Connection connection = RabbitMqUtil.getRabbitConnection();
- Channel channel = RabbitMqUtil.getRabbitChannel(connection);
- String exchange = rabbitMqConfigDTO.getExchange();
- channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null);
- channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
- channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey());
- // 开端消费消息。
- channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body)
- throws IOException {
- //接收到的消息,停止业务逻辑处置。
- System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- });
- connection.close();
- return "消费胜利";
- }
-
- }
复制代码 8.创建控制层- @RestController
- public class RabbitMqController {
- @Autowired
- private RabbitMqService rabbitMqService;
- @GetMapping("/sendMessage")
- public String sendMessage() throws IOException, TimeoutException {
- return rabbitMqService.sendMessage();
- }
- @GetMapping("/consumeMessage")
- public String consumeMessage() throws IOException, TimeoutException {
- return rabbitMqService.consumeMessage();
- }
- }
复制代码 9.项目整体构造
10.完成启动项目
11.点击获取源码
测试
2. 进入控制台查看
此时可以看到堆积10条消息,说明消息发送胜利了
3. 消费消息
4.再次进入控制台查看
堆积的消息已变为0说明消息已经被全部消费了
后序
自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经处置方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所协助。
|
|