SpringBoot and RabbitMQ Series One: Basics

SpringBoot and RabbitMQ Series One: Basics

1 Introduction

You know that message middleware will be used in the system development process

Asynchronous processing of messages
,
Decoupling between systems
,
System traffic peak clipping
. In the process of using message middleware, we need to understand the following scenarios:

  • How to integrate with our development framework SpringBoot
  • How to send a message
  • How to send complex messages
  • How to ensure the reliability of sending messages
  • How to consume news
  • How to ensure the reliability of consumer news
  • How to ensure the scalability of consumers
  • How to use consumers to peak traffic

Start the writing of this article based on these scenarios, this article is a message middleware

RabbitMQ
As an example

2. Integration with SpringBoot

2.1 Add dependency

Want to pass

SpringBoot
Framework integration
RabbitMQ
It s a relatively easy thing, just add the corresponding in the pom file
starter
Can

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> Copy code

2.2 Add MQ service configuration

Spring:
  RabbitMQ:
    Host:  localhost
    Port:  5672
    username:  Guest
    password:  Guest
    Virtual-Host:  Boot-Example
duplicated code

2.3 Inject message template

@Autowired
private  RabbitTemplate rabbitTemplate;
copy code

2.4 Sending a message

public  void  the sendMessage ()  {
    rabbitTemplate.convertAndSend ( "Test" "Test" "MQ Produce A Send Message" );
}
copy the code

2.5 Consumer news

@Component
@Slf 4j
public  class  MqConsumer  {
    
    @RabbitListener (id =  "consumerMessage1" , queues =  "test" )
    public  void  consumeMessage1 (Message message, Channel channel, String content)  {
        log.info( "receive message1 {}" , content);     
    }
Copy code

3. How to send complex messages

In the process of system development, complex object interactions with other systems are often in the form of json, which requires us to set up message serialization and deserialization converters

3.1 The producer sets the message converter

@Bean
public  RabbitTemplate  rabbitTemplate (RabbitTemplateConfigurer Configurer, the ConnectionFactory The connectionFactory)  {
    RabbitTemplate rabbitTemplate =  new new  RabbitTemplate ();
    configurer.configure (rabbitTemplate, The connectionFactory);
    rabbitTemplate.setMessageConverter ( new new  Jackson2JsonMessageConverter ());
    return  rabbitTemplate;
}
copy the code

3.2 Consumer settings message converter

@Bean
public  RabbitListenerContainerFactory rabbitListenerContainerFactory (the ConnectionFactory The connectionFactory) {<?>
    SimpleRabbitListenerContainerFactory Factory =  new new  SimpleRabbitListenerContainerFactory ();
    factory.setConnectionFactory (The connectionFactory);
    factory.setMessageConverter ( new new  Jackson2JsonMessageConverter ());
    return  Factory;
}
copy the code

3.3 Consumer designated listening container factory

@RabbitListener (Queues =  "Test3" , ContainerFactory =  "rabbitListenerContainerFactory" )
public  void  consumeComplexMessage (the Order Order)  {
    log.info ( "the receive Complex Message: {}" , Order);
}
copy the code

4. Reliability of sending messages

4.1 Why to ensure the reliability of sending messages

Regarding the reliability of sending messages, let s take a look at the official documentation:

A RabbitMQ node can lose persistent messages if it fails before said messages are written to disk. For instance, consider this scenario:

  1. a client publishes a persistent message to a durable queue
  2. a client consumes the message from the queue (noting that the message is persistent and the queue durable), but confirms are not active,
  3. the broker node fails and is restarted, and
  4. the client reconnects and starts consuming messages

At this point, the client could reasonably assume that the message will be delivered again. This is not the case: the restart has caused the broker to lose the message. In order to guarantee persistence, a client should use confirms. If the publisher's channel had been in confirm mode, the publisher would not have received an ack for the lost message (since the message hadn't been written to disk yet).

The rough meaning is that the RabbitMQ message server may lose persistent messages due to downtime before writing the message to the disk. If the producer channcel sets the confirmation mode, it can ensure that the message is not lost, because only the message written to the disk will the producer receive the ack notification.

4.2 How to ensure the reliability of sending messages

4.2.1 Add configuration

RabbitMQ:
    Publisher of the type-Confirm The-:  Correlated
    Publisher-returns A:  to true
copy the code

4.2.2 Specify callback function

achieve

RabbitTemplate.ConfirmCallback
,
RabbitTemplate.ReturnCallback
interface

@Configuration
@Slf 4j
public  class  MqConfig  implements  RabbitTemplate . ConfirmCallbackRabbitTemplate . ReturnCallback  {

    /**
     * Basic.ack returned by the message server
     *
     *  @param  correlationData associated data object
     *  @param  ack ack
     *  @param  cause abnormal information
     */

    @ Override
    public  void  confirm (CorrelationData correlationData,  boolean  ack, String cause)  {
        log.info("receive ack confirm:{} from broker server" , ack);
    }

    /**
     * The basic.return returned by the message server
     *
     *  @param  message message object
     *  @param  replyCode response code
     *  @param  replyText response text
     *  @param  exchange Switch
     *  @param  routingKey routing key
     */

    @Override
    public  void  returnedMessage (Message message,  int  replyCode, String replyText, String exchange, String routingKey)  {
        log.error("receive return message: {} from broker server, reply code: {}, reply text: {},"  +
                "exchange: {}, routing key: {}" , message.toString(), replyCode, replyText, exchange, routingKey);
    }

    @Bean
    public  RabbitTemplate  rabbitTemplate (RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory)  {
        RabbitTemplate rabbitTemplate =  new  RabbitTemplate();
        configurer.configure(rabbitTemplate, connectionFactory);
        rabbitTemplate.setReturnCallback( this );
        rabbitTemplate.setConfirmCallback( this );
        rabbitTemplate. setMessageConverter( new Jackson2JsonMessageConverter ());
        return  rabbitTemplate;
    }
}
copy the code

4.2.3 confirm and returnedMessage

At first glance, the comfirm() callback function can achieve the reliability of sending messages. The returnedMessage() callback function seems a bit redundant. This view can be established under certain conditions. Let s take a look at the official documentation:

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack

The message will be sent to the exchange first, and the exchange will route the message to the corresponding queue according to the relevant rules. For messages that cannot be routed by the exchange, the message server will first return basic.return, and then return basic.ack, that is, it will first call back returnedMessage() Function, and then call back the confirm() function. Under normal circumstances, the message server will return basic.ack after persisting the message and the ack parameter of the comfirm() callback function is true. However, in the case that the exchange cannot route the message, the ack in the confirm() callback function parameter returns true. In this case, relying only on the comfirm() callback function cannot guarantee the reliability of the message. It needs to be combined with the returnedMessage() callback function; if To ensure that the exchange and the queue can be routed successfully, the confirm() callback function can ensure the reliability of sending messages.

5. The reliability of the message

5.1 Why to ensure the reliability of consumer messages

Regarding the reliability of consumer news, let s take a look at the description of the official document:

When a node delivers a message to a consumer, it has to decide whether the message should be considered handled (or at least received) by the consumer. Since multiple things (client connections, consumer apps, and so on) can fail, this decision is a data safety concern. Messaging protocols usually provide a confirmation mechanism that allows consumers to acknowledge deliveries to the node they are connected to. Whether the mechanism is used is decided at the time consumer subscribes.

Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit ("manual") client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:

By default, the message server will immediately delete the message after it is delivered to the consumer. In this mode, the message may be lost due to special scenarios such as connection disconnection, channel disconnection, and consumer abnormality, and the reliability of the message cannot be guaranteed. To ensure the reliability of consumption messages, it is necessary to introduce a confirmation mechanism, that is, the message server does not delete the message immediately after dispatching the message, and only deletes the message after receiving the consumer's ack. If the consumer disconnects from the message server , The message server needs to resend the message to other consumers for consumption.

5.2 How to ensure the reliability of consumer messages

Examples of official documents:

boolean  autoAck =  false ;
//Set autoAck to false
channel.basicConsume(queueName, autoAck,  "a-consumer-tag" ,
     new  DefaultConsumer(channel) {
         @Override
         public  void  handleDelivery (String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties ,
                                    byte [] body)

             throws  IOException
         
{
             long  deliveryTag = envelope.getDeliveryTag();
             //negatively acknowledge, the message will
             //be discarded
             channel.basicReject(deliveryTag,  false);
         }
     });
Copy code

The key part is that the second parameter of channel.basicConsume() needs to be set to false

SpringBoot integration:

The default value is the automatic mode, which can meet the reliability of the message. Explanation of the enumeration value of the ack mode

If you use SpringBoot, you don't need to worry about the reliability of consumer messages.

6. How to ensure the scalability of consumers

When there are multiple consumers in a queue, the message server will push messages to consumers in a polling manner. Based on this, when the message server has a backlog of messages, it can increase the consumption capacity by adding machines, and consumers naturally have the ability to scale horizontally.

7. How to use consumers to peak traffic

Under normal circumstances, the message server will deliver the message to the consumer if there is a message. If the delivery process is unlimited, it will increase the load on the consumer system. In severe cases, the consumer system will not be able to provide services to the consumer. How to achieve traffic peak reduction? The answer is to set QoS

Because messages are sent (pushed) to clients asynchronously, there is usually more than one message "in flight" on a channel at any given moment. In addition, manual acknowledgements from clients are also inherently asynchronous in nature. So there's a sliding window of delivery tags that are unacknowledged. Developers would often prefer to cap the size of this window to avoid the unbounded buffer problem on the consumer end. This is done by setting a "prefetch count" value using the basic.qos method. The value defines the max number of unacknowledged deliveries that are permitted on a channel. Once the number reaches the configured count, RabbitMQ will stop delivering more messages on the channel unless at least one of the outstanding ones is acknowledged.

The rough meaning is to set the threshold through basic.qos. When the number of unconfirmed messages held by the consumer exceeds the threshold, the message server will no longer distribute the message to the consumer, thus achieving traffic peak reduction

8. References