First acquaintance with RabbitMQ of message middleware (1)

First acquaintance with RabbitMQ of message middleware (1)

This is the first day I participated in the Wenwen Challenge. For details of the event, please view: Wenwen Challenge

Author: threedayman Hengsheng LIGHT Cloud Community

What is RabbitMQ

RabbitMQ is the most widely deployed open source message broker. RabbitMQ is lightweight and easy to deploy. Support multiple message protocols.

Why use RabbitMQ

Common usage scenarios include decoupling, asynchronous, peak cutting and valley filling. Below we use examples to experience the benefits of using MQ in their respective scenarios.

Decoupling

Suppose there is system A, which depends on system B, system C, and system D. The dependency relationship has been hard-coded in the code, and the structure is as shown in the figure below.

Assuming that there is a new requirement at this time, and the system A needs to call the system E to perform some new business operations, then the programmers of the system A will inevitably operate and deal with the requirements for accessing the system E. In the same way, if you want to remove the dependency of a certain system, such as system C, you also need to deal with the development of system A.

So at this time, if we introduce MQ to see what changes it will bring.

System A sends messages to MQ, and systems B, C, and D subscribe to the corresponding messages for business processing. Then let s take a look at the previous scenario. Assuming that a dependency system E needs to be added, only the developers of system E need to make the corresponding subscription consumption. Similarly, if you want to cancel the dependency of system C, you only need to cancel the subscription of system C. news.

asynchronous

Assuming that the operation of system A takes 30ms, system A will also synchronously call system B (300ms), system C (600ms), and system D (200ms), then the response time of this request will reach 1130ms. Too long response time will bring bad user experience to customers.

After the introduction of MQ, let's see what will change

System A sends the message to MQ (7ms) and then returns, and systems B, C, and D respectively monitor MQ for business processing. Then we can see that after the introduction of MQ for asynchronous processing, the overall response time has dropped from 1130ms to 37ms for the long time-consuming synchronization dependency just now.

Peak shaving and valley filling

Suppose we have a business peak period of request volume can reach 7000/s and business trough traffic is only 100/s, but our mysql database can only withstand 2000/s requests.

In this case, the maximum load capacity of mqsql will be exceeded during the peak period and it will be directly suspended, but the resources of mqsql will not be used rationally during the low peak period.

After the introduction of MQ, let's see what will change

At this point, the system can pull messages according to its maximum consumption capacity of 2000/s, and can smoothly pass the peak period of the business, and at the same time delay some of the messages to the trough period for processing. It is unlikely that the database will be suspended due to high traffic, and the overall service will not be available.

How to use RabbitMQ

This section mainly focuses on several commonly used examples written by RabbitMQ's java client. If you are familiar with using RabbitMQ, you can skip this section. To view the complete RabbitMQ instructions, please visit the official documentation .

Hello world

Let us experience RabbitMQ through a Hello world example. First introduce the terms used in this example

  • Producer: Producer, used to send messages.
  • Queue: The message queue is used to store messages. The message is delivered to the message queue via the producer, and finally delivered to the consumer for consumption. The message queue receives the limitations of machine memory and hard disk resources.
  • Consumer: Consumer, used to receive and process messages.

In this example, we will produce Hello World messages, receive and print out the messages through consumers.

The key steps of the producer Send are shown in the notes

public class Send { //queue name private final static String QUEUE_NAME = "hello" ; public static void main (String[] argv) throws Exception { //Create a connection and channel between the server and the server ConnectionFactory factory = new ConnectionFactory(); //Please set the actual deployment node ip factory.setHost( "localhost" ); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //Declare a queue to send messages channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String message = "Hello World!" ; //Publish the message channel.basicPublish( "" , QUEUE_NAME, null , message.getBytes(StandardCharsets.UTF_8)); System.out.println( "[x] Sent'" + message + "'" ); } } } Copy code

Complete Send Code View

The key steps of consumer Recv are shown in the notes

public class Recv { //Queue name private final static String QUEUE_NAME = "hello" ; public static void main (String[] argv) throws Exception { //Create a connection with the server connection, channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //Declare the queue to be consumed channel.queueDeclare(QUEUE_NAME, false , false , false , null ); System.out.println( "[*] Waiting for messages. To exit press CTRL+C" ); //Process messages through this class DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println( "[x] Received'" + message + "'" ); }; channel.basicConsume(QUEUE_NAME, true , deliverCallback, consumerTag -> {}); } } Copy code

Complete Recv code review

Work Queues

In this example, we will introduce the distribution of time-consuming tasks to multiple workers through RabbitMQ. RabbitMQ will deliver messages to consumers through round-robin, which makes it easy for us to expand our consumption capacity.

For the key steps of Producer NewTask, see the notes

public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //Set the queue as a persistent channel.queueDeclare(TASK_QUEUE_NAME, true , false , false , null ); String message = String.join( "" , argv); //Set the message to be persistent channel.basicPublish( "" , TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes( "UTF-8" )); System.out.println( "[x] Sent'" + message + "'" ); } } } Copy code

Complete NewTask code review

See the notes for the key steps of consumer Woker

public class Worker { private static final String TASK_QUEUE_NAME = "task_queue" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //Set the queue to persistent channel.queueDeclare(TASK_QUEUE_NAME, true , false , false , null ); System.out.println( "[*] Waiting for messages. To exit press CTRL+C" ); //A consumer can process at most one unconfirmed message at the same time channel.basicQos( 1 ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println( "[x] Received'" + message + "'" ); try { doWork(message); } finally { System.out.println( "[x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false ); } }; channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {}); } //Simulate time-consuming tasks, one. Represents time-consuming 1S private static void doWork (String task) { for ( char ch: task.toCharArray()) { if (ch == '.' ) { try { Thread.sleep( 1000 ); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } } Copy code

Complete Worker Code View

Publish/Subscribe

We have already introduced RabbitMQ's core message model, producers, consumers, and queues. In this section, we will come into contact with another message model exchange **, which is responsible for receiving messages from producers and delivering them to the queue. . There are mainly the following types of exchage**

  • direct
  • topic
  • headers
  • fanout

In this example, we will use the fanout type as an explanation. From the name, we can probably guess that this type of exchange will broadcast the received message to its bound queue.

The key steps of Producer EmitLog are shown in the notes

public class EmitLog { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //Create an exchange and specify the type channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String message = argv.length < 1 ? "Info: Hello World!" : String.join( "" , argv); //This is different from the previous message. Specifying a specific exchange does not specify a specific queue channel.basicPublish(EXCHANGE_NAME, "" , null , message.getBytes( "UTF-8" )); System.out.println( "[x] Sent'" + message + "'" ); } } } Copy code

EmitLog complete code review

For the key steps of Consumer ReceiveLogs, see the notes

public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //Create a fanout type exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); //Get a unique, non-persistent, automatically deleted queue String queueName = channel.queueDeclare().getQueue(); //Bind the resume relationship between exchage and queue channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println( "[*] Waiting for messages. To exit press CTRL+C" ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println( "[x] Received'" + message + "'" ); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> {}); } } Copy code

ReceiveLogs complete code review

Routing

In the previous example, exchange broadcasts the received information to the bound queue. In this example, we will add some specific bindings to enable exchange to deliver different messages to different queues through routingKey (full match) in. For example, the daily log distinguishes the error log into a separate queue.

Producer EmitLogDirect key steps, see notes

public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //Declare a direct type exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null , message.getBytes( "UTF-8" )); System.out.println( "[x] Sent'" + severity + "':'" + message + "'" ); } } private static String getSeverity (String[] strings) { if (strings.length < 1 ) return "info" ; return strings[ 0 ]; } private static String getMessage (String[] strings) { if (strings.length < 2 ) return "Hello World!" ; return joinStrings(strings, "" , 1 ); } private static String joinStrings (String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return "" ; if (length <= startIndex) return "" ; StringBuilder words = new StringBuilder(strings[startIndex]); for ( int i = startIndex + 1 ; i <length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } Copy code

EmitLogDirect complete code review

For the key steps of Consumer ReceiveLogsDirect, see notes

public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1 ) { System.err.println( "Usage: ReceiveLogsDirect [info] [warning] [error]" ); System.exit( 1 ); } for (String severity: argv) { //Establish the relationship between exchange and queue and set routingKey channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println( "[*] Waiting for messages. To exit press CTRL+C" ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println( "[x] Received'" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'" ); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } } Copy code

ReceiveLogsDirect complete code review

Topics

Provide richer routing rules from exchange to queue. The rules are separated by routingKey. The maximum limit is 255bytes. Different from the previous full matching routingKey, the routingKey of the topic type exchange mainly adds two features.

  • *Represents a word**. **
  • **#** stands for 0 or one word.

Producer EmitLogTopic key steps, see notes

public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null , message.getBytes( "UTF-8" )); System.out.println( "[x] Sent'" + routingKey + "':'" + message + "'" ); } } private static String getRouting (String[] strings) { if (strings.length < 1 ) return "anonymous.info" ; return strings[ 0 ]; } private static String getMessage (String[] strings) { if (strings.length < 2 ) return "Hello World!" ; return joinStrings(strings, "" , 1 ); } private static String joinStrings (String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return "" ; if (length <startIndex) return "" ; StringBuilder words = new StringBuilder(strings[startIndex]); for ( int i = startIndex + 1 ; i <length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } Copy code

EmitLogTopic complete code review

For the key steps of Consumer ReceiveLogsTopic, see the notes

public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1 ) { System.err.println( "Usage: ReceiveLogsTopic [binding_key]..." ); System.exit( 1 ); } for (String bindingKey: argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println( "[*] Waiting for messages. To exit press CTRL+C" ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println( "[x] Received'" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'" ); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> {}); } } Copy code

ReceiveLogsTopic complete code review

What challenges does the introduction of RabbitMQ bring

Seeing this, do you want to try to introduce RabbitMQ into the project to optimize the current usage scenarios, so do we deploy a RabbitMQ service and then send a message without worry? In fact, when introducing a middleware, there are some problems at the same time. If we don't understand these problems deeply or comprehensively, then congratulations you will enter the pit-digger sequence. In order to become a reliable programmer, we must fully understand the challenges that the introduction of middleware brings to our project, so that we can calmly deal with it in future applications. The following is a list of common types of problems in message middleware

  • Message is lost
  • Duplicate message
  • Message accumulation
  • RabbitMQ's availability guarantee

In the following articles, we will explain the solutions to the above problems one by one. RabbitMQ message reliability transmission (two)

Reference documents

https://www.rabbitmq.com/RabbitMQ official document

tips: The author's personal experience is limited, please correct me for deficiencies.