Cabbage Java study room covers core knowledge
We have introduced the basic concepts and workflow of Kafka above. If it is just to develop Kafka applications, or just use Kafka in a production environment, then it is not necessary to understand the internal working principle of Kafka. However, understanding the inner workings of Kafka helps to understand Kafka's behavior and also to quickly diagnose problems.
1. Kafka cluster membership
Kafka runs on ZooKeeper. Because ZooKeeper appears in cluster form, Kafka can also appear in cluster form. This also involves the issue of how to coordinate multiple producers and multiple consumers. This maintenance of the relationship between the clusters is also done by ZooKeeper. The storage structure of kafka in zookeeper is shown in the following figure:
There will be multiple hosts (brokers) between Kafka clusters. Each broker will have a broker.id. Each broker.id has a unique identifier to distinguish it. This identifier can be manually specified in the configuration file, or Can be generated automatically .
Kafka can pass
Kafka will be in ZooKeeper when it starts
- If you want to start another broker with the same ID, you will get an error-the new broker will try to register, but it will not succeed because there is already a broker with the same ID in ZooKeeper.
- When the broker is down, there is a partition or the garbage collection pauses for a long time, the broker will be disconnected from ZooKeeper. At this time, the temporary node created by the broker at startup will be removed from ZooKeeper. The Kafka component that monitors the list of brokers will be notified that the broker has been removed.
- When the broker is closed, its corresponding node will also disappear, but its ID will continue to exist in other data structures, such as the theme's copy list, which we will talk about below. After shutting down a broker completely, if you start another brand new broker with the same ID, it will immediately join the cluster and have the same partition and topic as the old broker.
2. Broker Controller
There is also a controller component (Controller) between Kafka's broker clusters, which is the core component of Kafka. Its main function is to manage and coordinate the entire Kafka cluster with the help of ZooKeeper. Each broker in the cluster can be called a Controller, but after the Kafka cluster is started, only one broker will become a Controller .
Zookeeper related knowledge may be involved here, please refer to the author s article: Java Engineer s Advanced Road to Zookeeper
2.1. The role of Controller
Kafka is designed as a multi-threaded controller that simulates a state machine. It can function as follows:
- The controller is equivalent to the department manager (broker controller) in the department (cluster), and is used to manage the department members (broker) in the department;
- The controller is a monitor of all brokers, used to monitor the online and offline of the brokers;
- After the broker goes down, the controller can elect a new partition leader;
- The controller can send messages to the newly selected Leader of the broker;
It can be further divided into the following five points:
- Topic management : Kafka Controller can help us complete the operations of creating, deleting, and adding partitions to Kafka topics. In short, it has the highest exercise authority over partitions. In other words, when we execute the kafka-topics script, most of the background work is done by the controller.
- Partition redistribution : Partition redistribution mainly refers to the fine-grained allocation of existing topic partitions provided by the kafka-reassign-partitions script. This part of the function is also realized by the controller.
- Prefered leader election : The preferred leader election is mainly a solution provided by Kafka to avoid overloading some Brokers .
- Cluster member management : Mainly manage new brokers, broker shutdowns, and broker downtimes.
- Data Service : The last major job of the controller is to provide data services to other brokers. The most complete cluster metadata information is stored on the controller, and all other brokers will periodically receive metadata update requests from the controller to update the cached data in its memory.
When the controller finds that a broker has left the cluster (by observing the relevant ZooKeeper path), the controller will receive a message: Those partitions managed by this broker need a new leader. The controller traverses each partition in turn to determine who can be the new leader, and then sends a message to all partitions containing the new leader or existing follower. The request message contains information about who is the new leader and who is the follower. Subsequently, the new Leader began to process requests from producers and consumers, and Follower was used to replicate from the new Leader.
When the controller discovers that a broker joins the cluster, it uses the broker ID to check whether the newly joined broker contains a copy of the existing partition. If there is a controller, the message will be sent to the newly added broker and the existing broker .
2.2. Controller election
Kafka's current election controller rules are:
- The first broker to start in the Kafka cluster creates a temporary node in ZooKeeper /controllerMake yourself a controller controller.
- Other brokers will also try to create this node at startup, but since this node already exists, I want to create it later /controllerNode will receive an exception that the node already exists.
- Then other brokers will register a ZooKeeper watch object on this controller,/controllerWhen the node changes, other brokers will receive the node change notification.
This way you can ensure that only one controller exists. Then there must be a problem with only a single node, and that is a single point problem .
If the controller is shut down or disconnected from ZooKeeper, the temporary node on ZooKeeper will disappear. After the other nodes in the cluster receive the message that the watch object sends the controller to go offline, the other broker nodes will try to make themselves the new controller . The creation rules of other nodes are the same as the creation principles of the first node. The first broker to successfully create a controller node in ZooKeeper will become the new controller, and then other nodes will receive the existing exceptions of the node. Then create a watch object again on the new controller node to monitor.
2.3. Controller's data storage
We mentioned above that the broker controller will provide data services for storing large amounts of Kafka cluster data. As shown below:
The information stored above can be classified, mainly divided into three categories:
- All information on the broker : including all partitions in the broker, all partition copies of the broker, which brokers are currently running, and which brokers are shutting down.
- All topic information : including specific partition information, such as who the leader replica is, which replicas are in the ISR set, etc.
- All partitions involving operation and maintenance tasks : including the list of partitions currently undergoing preferred leader election and partition redistribution.
Kafka is inseparable from ZooKeeper, so this data information is also saved in ZooKeeper. Whenever the controller is initialized, it will read the corresponding metadata from ZooKeeper and fill it into its own cache.
2.4. Failover of Controller
As we said before, the first one in ZooKeeper
At the beginning, broker1 will be the first to successfully register as a controller, and then broker1 will be dropped due to network jitter or other reasons. ZooKeeper will detect the drop of broker1 through the Watch mechanism. After that, all surviving brokers will start to compete to become the controller. At this time, broker3 will be the first to register successfully. At this time, the controller information stored by ZooKeeper is from broker1 -> broker3. After that, broker3 will read the metadata information from ZooKeeper and initialize it to its own cache.
Note: What is stored in ZooKeeper is not cache information, but what is stored in broker is cache information .
2.5. The design principle of Controller
Before Kafka version 0.11, the design of the controller was quite cumbersome. The Kafka controller is designed as a multi-threaded controller that simulates a state machine. This design actually has some problems :
- The controller state changes are executed concurrently by different listeners, so complicated synchronization is required, error-prone and difficult to debug.
- The state propagation is not synchronized, and the broker may have multiple states when the time is uncertain, which will cause unnecessary additional data loss
- The controller controller also creates additional I/O threads for topic deletion, resulting in performance loss
- The controller's multi-threaded design will also access shared data. We know that multi-threaded access to shared data is the most troublesome part of thread synchronization. In order to protect data security, the controller has to use a large number of ReentrantLock synchronization mechanisms in the code, which further slows down The processing speed of the entire controller is improved.
After Kafka 0.11, the Kafka controller adopted a new design, changing the multi-threaded solution to a single-threaded plus event queue solution . As shown below:
The main changes are as follows:
- Added an Event Executor Thread, event execution thread . As can be seen from the figure, whether it is Event Queue or Controller context, the controller context will be handed over to the event execution thread for processing. Model all the original operations as independent events and send them to the dedicated event queue for consumption by this thread.
- Change all the previously synchronized ZooKeeper to asynchronous operation . The ZooKeeper API provides two ways to read and write: synchronous and asynchronous. Previously, the controller operated ZooKeeper in a synchronous mode. This time the synchronous mode was changed to asynchronous. According to the test, the efficiency has increased by 10 times.
- Requests are processed according to priority. The previous design was that the broker would process all requests sent by the controller in a fair manner . What does that mean? Is fairness still bad? In some cases, it is true. For example, the broker is queuing to process the produce request. At this time, the controller sends a StopReplica request. What will you do? Are you still processing the produce request? Is this produce request still useful? At this time, the most reasonable processing order should be to give the StopReplica request a higher priority so that it can be preemptively processed.
3. Kafka's copy mechanism
The replication function is the core function of the Kafka architecture. In the Kafka documentation, Kafka describes itself as a distributed, partitionable, and replicable commit log service . The reason why replication is so critical is that the persistent storage of messages is very important, which can ensure that Kafka is still highly available even after the primary node goes down. The copy mechanism can also be called a backup mechanism (Replication), which usually refers to a distributed system that stores the same data backup/copy on multiple network interactive machines.
Kafka uses topics to organize data. Each topic is divided into several partitions. The partitions will be deployed on one or more brokers. Each partition will have multiple copies, so the copies will also be stored on the broker. The broker may keep thousands of copies. The following figure is a schematic diagram of copy replication:
As shown in the figure above, I only drew two brokers for the sake of simplicity. Each broker refers to a message that saves a topic, in broker1
3.1. Leader copy
There are two types of copies: one is Leader (leader) copy, the other is Follower (follower) copy.
Kafka elects a copy when creating a partition, and this elected copy is the leader copy.
3.2. Follower copy
The copies other than the leader copy are collectively referred to as the follower copy, and the follower does not provide external services. Here is how the leader copy works:
Need to pay attention to the following points:
In Kafka, the follower copy, that is, the follower copy, does not provide external services . This means that no follower copy can respond to requests from consumers and producers. All requests are handled by the leader copy. In other words, all requests must be sent to the broker where the leader copy is located. The follower copy is only used for data pull, and it is pulled asynchronously and written to its commit log to achieve synchronization with the leader ;
When the broker where the Leader copy is located goes down, Kafka relies on the monitoring function provided by ZooKeeper to perceive it in real time, and start a new round of elections, and choose one of the follower copies as the leader. If the down broker is restarted, the copy of the partition will rejoin as a follower.
3.3. Follower and Leader copy synchronization
Another task of the Leader is to figure out which follower's status is consistent with itself. In order to ensure that the follower is consistent with the status of the leader, it tries to copy the message from the leader before a new message arrives. In order to be consistent with the Leader, Follower initiates a request for data to the Leader. This request is the same as the information sent by the consumer in order to read the message.
The process of the follower sending a message to the leader is as follows: first request message 1, and then receive message 1, after the time is up to request 1, send request 2, before receiving the leader to send to the follower, the follower is not Will continue to send messages . The process is as follows:
It is very important that the follower copy will not continue to send messages before receiving the response message.By looking at the latest offset requested by each follower, the Leader will know the progress of each follower's replication . If Follower does not request any message within 10s, or although Follower has sent a request, but does not receive a message within 10s, it will be considered out of sync. If a copy is not synchronized with the leader, then after the leader drops, the copy will not be called the leader, because the copy of the news is not all .
On the contrary, if the message synchronized by Follower is consistent with the message of the Leader copy, then this follower copy is also called a synchronized copy. In other words, if the leader goes offline, only the synchronized copy can be called the leader.
What are the benefits of the copy mechanism?
- You can immediately see the written message, that is, after you use the producer API to successfully write a message to the partition, immediately use the consumer to read the message just written;
- What does it mean to be able to achieve the idempotence of messages? That is, for the message generated by the producer, when the consumer consumes, it will see that the message exists every time, and there will be no case where the message does not exist;
3.4. Synchronous replication and asynchronous replication
Since the Leader and Follower copy of a copy is sent - wait mechanism , which is a replication synchronization, then why Follower synchronized copy Leader when it is an asynchronous operation?
The follower copy will save the message in the local log after synchronizing the leader copy. At this time, the follower will send a response message to the leader copy, telling the leader that it has been saved successfully, and the synchronously replicated leader will wait for all the follower copies to be written successfully , And then return to the producer to write a successful message. In asynchronous replication, the leader copy does not need to care about whether the follower copy is written successfully. As long as the leader copy saves the message to the local log, it will return to the producer a successful write message.
Synchronous replication :
- The producer informs ZooKeeper to identify the leader;
- The producer writes messages to the leader;
- After the leader receives the message, it will write the message to the local log;
- Followers will pull news from the leader;
- Followers write log to the local;
- The follower sends a successful message to the leader;
- The leader will receive all the messages sent by the followers;
- The leader sends a successful write message to the producer;
Asynchronous replication :
The difference with synchronous replication is that after the leader writes to the local log, it directly sends a write success message to the client without waiting for all followers to replicate.
3.5. ISR (In-Sync Replicas)
Kafka dynamically maintains a set of In-Sync Replicas (a set of In-Sync Replicas), referred to as ISR .
ISR is also a very important concept. As we said before, the follower copy does not provide services, but the data of the leader copy is pulled asynchronously on a regular basis. This operation is equivalent to copying, ctrl-c + ctrl-v everyone It must be cooked. So does it mean that the number of replica messages in the ISR set will be the same as the number of leader replica messages? That s not necessarily true. The judgment is based on the parameters in the broker
3.6. Unclean leader election
Since the ISR can be dynamically adjusted, there will inevitably be a situation where the ISR set is empty. Since the leader copy must appear in the ISR set, then the ISR set is empty, which means that the leader copy is also hung up, so at this time Kafka needs to re-elect a new leader, so how to elect? Now you need to change your mind. We said above that the ISR set must be a copy that is synchronized with the leader, so the copy in the ISR set must be a copy that is not synchronized with the leader, that is, it is no longer a follower in the ISR list. The copy will lose some messages.
If you enable broker-side parametersunclean.leader.election.enableIf so, the next leader will be elected among these asynchronous copies. This election is also called the Unclean leader election .
If you have been in contact with distributed projects, you must know the CAP theory, then this Unclean leader election actually sacrifices data consistency and ensures the high availability of Kafka. You can decide whether to enable Unclean leader election according to your actual business scenario. It is generally not recommended to enable this parameter, because data consistency is more important than availability.
4. Kafka's request processing flow
Most of the broker's job is to process requests from clients, partition replicas, and controllers to partition leaders. This kind of request is generally request/response type. I guess the earliest request/response method for you to contact is HTTP request. In fact, HTTP requests can be synchronous or asynchronous. Generally, normal HTTP requests are synchronous, and the biggest feature of the synchronous method is
Note: We only use HTTP requests as an example, and Kafka uses TCP Socket-based communication for communication .
It can be said that synchronous requests are processed sequentially, while the execution method of asynchronous requests is uncertain, because asynchronous requires the creation of multiple execution threads, and the execution order of each thread is different. So what are the disadvantages of these two methods?
- The biggest drawback is that a synchronized manner throughput is poor, resource utilization is very low , since only the order of processing requests, therefore, each request must wait before a request is processed in order to be processed. This method is only suitable for systems where requests are sent very infrequently.
- The disadvantage is that asynchronously creates a thread for each request practice extremely expensive , and in some scenarios even overwhelm the entire service.
4.1. Responsive model
Is Kafka synchronous or asynchronous? No, Kafka uses a Reactor model .
So what is a responsive model? Simply put, the Reactor mode is an implementation of event-driven architecture, which is especially suitable for scenarios where multiple clients send requests to the server concurrently, as shown in the following figure:
Kafka's broker has a SocketServer component, which is similar to a processor. SocketServer is a TCP-based Socket connection. It is used to accept client requests. All request messages include a message header, which contains the following information:
- Request type --- (that is, API Key)
- Request version --- (broker can handle client requests of different versions and make different responses according to the client version)
- Correlation ID --- a unique number, used to identify the request message, and will also appear in the response message and error log (used to diagnose problems)
- Client ID --- Used to identify the client who sent the request
The broker will run an Acceptor thread on each port it monitors. This thread will create a connection and hand it over to the Processor (network thread pool) . The number of Processors can be used
The Acceptor thread uses polling to send the stack request fairly to the network thread pool. Therefore, in actual use, these threads usually have the same probability of being allocated to the pending request queue, and then get it from the response queue In response to messages, send them to the client. Processor request in the network thread pool-response processing is still more complicated, the following is the processing flow chart in the network thread pool:
After the Processor network thread pool receives the message sent by the client and other brokers, the network thread pool will put the message in the request queue. Note that this is a shared request queue. Because the network thread pool is multi-threaded, the message of the request queue is requested. Is an area shared by multiple threads , which is then processed by the IO thread pool, and what to do according to the type of message, such as
Note: IO thread pool can pass broker-side parametersnum.io.threadsTo configure, the default number of threads is 8, which means that 8 IO processing threads are automatically created after each broker is started.
4.2. Production request
When the producer writes a message to Kafka, how does it ensure that the message is not lost? That is through the ACK response mechanism! When the producer writes data to the queue, a parameter can be set to determine whether to confirm that Kafka has received the data. The value of this parameter can be set to 0, 1, all.
Simply put, different configurations have different definitions of writing success. If
After the message is written to the leader of the partition, if the value of the acks configuration is all, then these requests will be stored in the Purgatory buffer, and the response will not be sent to the client until the leader copy finds that the follower copy has copied the message end.
4.3. Get request
acquisition request broker manner similar to the manner of production request, the client sends a request to the broker a request relating to a specific partition offset message, if the offset is present, will be used Kafka zero-copy technology to send messages to the client, Kafka The message will be sent directly from the file to the network channel without passing through any buffers, so as to obtain better performance.
The client can set the upper and lower limits for obtaining the requested data. The upper limit refers to the memory space allocated by the client to receive enough messages. This limit is more important. If the upper limit is too large, it is likely to directly exhaust the client's memory. The lower limit can be understood as the meaning of saving enough data packets to send, which increases the time cost . As shown below:
As you can see in the figure, there is a process of waiting for message accumulation between pulling messages --> messages. You can think of this message accumulation as a timeout period, but the timeout will cause an exception and the message accumulation timeout. Will respond to the receipt afterwards. The delay time can be passed
4.4. Metadata request
Both the production request and the response request must be sent to the leader replica. If the broker receives a request for a particular partition and the leader of the request is in another broker, then the client sending the request will receive the leader of the non-partition Error response; if a request for a certain partition is sent to a broker that does not contain a leader, the same error will also occur. The Kafka client needs to send the request and response to the correct broker.
In fact, the client will use a metadata request that contains a list of topics that the client is interested in, and the server's response message indicates the topic partition, leader copy, and follower copy . Metadata requests can be sent to any broker, because all brokers will cache this information.
Under normal circumstances, the client will cache this information and send production requests and corresponding requests directly to the target broker. These caches need to be refreshed every other time. Use
5. Kafka rebalancing process
A consumer group must have a group coordinator (Coordinator), and the rebalancing process is completed with the help of Coordinator.
Group Coordinator (Coordinator): The group coordinator is a broker that can receive heartbeat messages from all consumers in the consumer group . In the earliest version, the metadata information was stored in ZooKeeper, but currently the metadata information is stored in the broker. Each consumer group should be synchronized with the group coordinator in the group. When all decisions are to be made in the application node, the group coordinator can satisfy the JoinGroup request and provide metadata information about the consumer group, such as allocation and offset.
The group coordinator also has the right to know the heartbeats of all consumers. Another role in the consumer group is the leader. Pay attention to distinguish it from the leader copy and the kafka controller. The leader is the role responsible for decision-making in the group, so if the leader goes offline, the group coordinator has the right to kick all consumers out of the group. Therefore, a very important behavior of the consumer group is to elect a leader, and read and write metadata information about allocation and partition with the coordinator .
Consumer leader: Every consumer group has a leader. If the consumer stops sending heartbeats, the coordinator will trigger a rebalance.
Conditions under which rebalancing occurs :
- Any topic subscribed by the consumer changes;
- Changes in the number of consumers;
- The number of partitions has changed;
- If you subscribe to a topic that has not yet been created, rebalancing occurs when the topic is created. If the topics you subscribe to are deleted, rebalancing will also occur;
- The consumer is considered by the group coordinator to be in the DEAD state, which may be caused by the consumer crash or being in a running state for a long time. This means that the consumer has not sent anything to the group coordinator within a reasonable time range Heartbeat, which can also cause rebalance to occur.
5.1. Rebalance state transformation
Kafka has designed a consumer group state machine (State Machine) , to help the coordinator to complete the re-balancing process. There are five main states of the consumer state machine. They are Empty, Dead, PreparingRebalance, CompletingRebalance and Stable .
|Empty||There are no members in the group, but the consumer group may have submitted displacement data, and these displacements have not expired.|
|Dead||Similarly, there are no members in the group, but the metadata information of the group has been removed on the coordinator side. The coordinator component saves all group information currently registered with it. The so-called metadata information is similar to this registration information.|
|PreparingRebalance||The consumer group is ready to start rebalancing, at this time all members must request to join the consumer group again.|
|CompletingRebalance||All members of the consumer group have joined, and each member is waiting for the distribution plan. This state is called AwaitingSync in the older version, and it is equivalent to CompletingRebalance.|
|Stable||The steady state of the consumer group. This status indicates that the rebalancing has been completed and the members of the group can consume data normally.|
After understanding the meaning of these states, let's use several paths to represent the rotation of consumer states:
The consumer group is in the Empty state at first. When the rebalance is turned on, it will be placed in the PreparingRebalance state and wait for new consumers to join. Once a new consumer joins, the consumer group will be in the CompletingRebalance state waiting for allocation. As long as a new consumer joins or leaves the group, rebalance will be triggered, and the consumer's state is in the PreparingRebalance state. After waiting for the allocation mechanism to be specified, the allocation is completed, then its flow chart is like this:
On the basis of the above figure, when the consumer group reaches the Stable state, once a new consumer joins/leaves/heartbeat period, rebalance is triggered, and the state of the consumer group is in the PreparingRebalance state again. Then its flow chart is like this:
On the basis of the above figure, after the consumer group is in the PreparingRebalance state, unfortunately, no one is playing and all consumers have left. At this time, the displacement data of consumer consumption may be retained. Once the displacement data expires Or refreshed, then the consumer group is in the Dead state. Its flow chart is like this:
On the basis of the above figure, we analyze the consumer's weight balance, in PreparingRebalance or CompletingRebalance or Stable occur at any state of displacement topic partition Leader is changed, the group directly in the Dead state, and all its paths are as follows:
Note: Generally, the occurrence of Required xx expired offsets in xxx milliseconds indicates that Kafka has probably deleted the displacement data of this group. Only the group in the Empty state will perform the operation of deleting the expired displacement.
5.2. Consumers value balance
From the consumers are looking for balance in two steps: namely consumers to join the group and wait leader allocation scheme . The corresponding requests after these two steps are JoinGroup and SyncGroup .
When a new consumer joins the group, this consumer will send a JoinGroup request to the coordinator. In this request, each consumer member needs to submit their own consumption topics. As we said in the above description of the group coordinator, the purpose of this is to allow the coordinator to collect enough metadata information to select consumption The leader of the group. Normally, the first consumer to send a JoinGroup request is automatically called the leader. The task of the leader is to collect subscription information of all members, and then formulate a specific partition consumption distribution plan based on this information. As shown in the figure:
After all consumers have joined and submitted the metadata information to the leader, the leader makes a distribution plan and sends a SyncGroup request to the coordinator, who is responsible for issuing the consumption strategy in the group. The following figure describes the process of a SyncGroup request:
After all members have successfully received the distribution plan, the consumer group enters the Stable state, that is, starts normal consumption work.
5.3. Rebalancing from the perspective of the coordinator
From the point of view of the coordinator, rebalancing mainly has the following trigger conditions:
- New member joins the group
- Group members leave voluntarily
- Group member crashes and leaves
- Group members submit displacement
5.3.1. New members join the group
In the scenario we are discussing, the consumer cluster state is in the process of Stable waiting for allocation. At this time, if a new member joins the group, the process of rebalancing:
5.3.2. Group members leave voluntarily
Group members leaving the consumer group means that the consumer instance calls the close() method to actively notify the coordinator that it wants to quit. There will be a new LeaveGroup() request. As shown below:
5.3.3. Group members crash and leave
Group member crash refers to a serious failure of the consumer instance, downtime or unresponsiveness for a period of time, and the coordinator cannot receive the consumer's heartbeat, it will be considered as a group member crash. The crash is passive and the coordinator usually needs to leave the group. Waiting for a period of time to perceive, this period of time is generally a parameter of the consumer
5.3.4. Group members submit displacement
We will no longer use graphics to represent this process. The general description is that after the consumer sends the JoinGroup request, the consumers in the group must submit their respective displacements within the specified time range, and then open the normal JoinGroup/SyncGroup request send.