[Netty] ChannelOutboundBuffer source code analysis

[Netty] ChannelOutboundBuffer source code analysis

The previous article analyzed the source code of RecvByteBufAllocator and introduced how Netty receives the data sent by the peer. And how Netty uses AdaptiveRecvByteBufAllocator to adaptively adjust the dynamic allocation of ByteBuf to solve the problem of excessive Java ByteBuffer allocation and waste of memory, too small allocation and frequent expansion.

This article will analyze how Netty sends data out.

Pre-knowledge

Data transmission types supported by Netty First of all, you need to know that to send data through Netty, it only supports two data types: ByteBuf and FileRegion. The former can be regarded as ByteBuffer, ordinary byte data transmission. The latter is file transfer, and Netty uses FileRegion to achieve zero copy of file transfer.

write and flush write() do not send data, but simply temporarily store the data in the ChannelOutboundBuffer. Flush() is the real data transmission to the peer through the Socket. writeAndFlush() simply executes the above two methods.

Channel high and low water mark When the program writes a large amount of data, or although flush() is called, the TCP buffer is filled up due to the fact that the peer is too late to receive the data, or due to network reasons, and a large amount of messages are accumulated in the ChannelOutboundBuffer , Resulting in memory overflow. In order to protect your program, Netty has set a "high and low water mark" for the Channel. When the backlog of messages exceeds the high water mark, Netty will set the Channel to the "unwritable" state and trigger

channelWritabilityChanged
Callback, you can pass
Channel.isWritable()
Determine whether to continue writing data. by
ChannelConfig.setWriteBufferHighWaterMark()
with
ChannelConfig.setWriteBufferLowWaterMark()
Set the high and low water mark of the Channel.

Subscribe to OP_WRITE event Since the write() operation is initiated by the user, why subscribe to Channel

OP_WRITE
What about the event? Because the TCP buffer may be full, you should subscribe at this time
OP_WRITE
Event, temporarily abandon the write operation, wait
Selector
When you are notified that the channel is writable, you can continue writing.

ByteBuf can be converted into ByteBuffer Java's native SocketChannel only supports writing to ByteBuffer. When you write ByteBuf through Netty, it will convert ByteBuf to ByteBuffer and write again. The method is

ByteBuf.internalNioBuffer()
.

Know the memory layout of Java objects in the JVM

write(msg)
When, the msg will be packaged as an Entry node and added to the end of the chain, one of the attributes
pendingSize
Records the memory space occupied by the message. In addition to the space occupied by the msg data itself, this space also includes the space occupied by the Entry object, so 96 is added by default. Why is 96 will be explained later. First of all, you should know that the object header of an object occupies a maximum of 16 bytes, an object reference occupies at least 4 bytes, and a maximum of 8 bytes. A long type occupies 8 bytes, and an int type occupies 4 bytes. , Boolean type occupies 1 byte. In addition, JVM requires that the space occupied by Java objects must be an integral multiple of 8 bytes, so there will be padding bytes.

ChannelHandlerContext.writeAndFlush() analysis

The following are simple examples of sending a ByteBuf and FileRegion respectively:

@Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { //Send a hello ctx.writeAndFlush(Unpooled.wrappedBuffer( "hello" ())); } Copy code
@Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { //Transmit an a.txt file RandomAccessFile accessFile = new RandomAccessFile( "/disk/a.txt" , "r" ); DefaultFileRegion region = new DefaultFileRegion(accessFile.getChannel(), 0 , accessFile.length()); ctx.writeAndFlush(region); } Copy code

Let me first talk about the overall process of writeAndFlush, the actual sending details will be explained in the next section.

transfer

ctx.writeAndFlush()
, Will find the Handler that can handle the write event from the current Handler, if the call is
ctx.channel().writeAndFlush()
, It will start from the TailContext of the Pipeline and look forward to the Handler that can handle the write event. The path of event propagation is slightly different. By default, HeadContext will be found for processing. The source code is as follows:

private void write (Object msg, boolean flush, ChannelPromise promise) { //Ensure that the message sent is not empty ObjectUtil.checkNotNull(msg, "msg" ); try { if (isNotValidPromise(promise, true )) { ReferenceCountUtil.release(msg); //cancelled return ; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } //Look for the Channel that can handle the write event later, the HeadContext will be found by default. final AbstractChannelHandlerContext next = findContextOutbound(flush? (MASK_WRITE | MASK_FLUSH): MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); //If it is an EventLoop thread, execute it directly, otherwise submit a task for serial execution. if (executor.inEventLoop()) { if (flush) { //The call is writeAndFlush(), all flushes are true, and HeadContext.invokeWriteAndFlush() will be called here next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final WriteTask task = WriteTask.newInstance(next, m, promise, flush); if (!safeExecute(executor, task, promise, m, !flush)) { task.cancel(); } } } Copy code

After finding the HeadContext, call it

invokeWriteAndFlush()
Method, in fact, is to call write and flush in one method:

void invokeWriteAndFlush (Object msg, ChannelPromise promise) { if (invokeHandler()) { //first call write() through the handler invokeWrite0(msg, promise); //then call flush() through the handler invokeFlush0(); } else { writeAndFlush(msg, promise); } } Copy code

Look first

invokeWrite0()
, It will call
HeadContext.write()
, Because the write operation needs to interact with the underlying API of the JDK, the operation will be transferred to
Channel.Unsafe
carried out:

@Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { //Need to interact with the underlying API of the JDK and transfer it to Unsafe for execution. unsafe.write(msg, promise); } Copy code

Will call next

AbstractChannel.AbstractUnsafe.write()
Method, it will filter the sent data first, only supports ByteBuf and FileRegion two types. Then it calculates the amount of memory occupied by the data sent, because as mentioned earlier, once the backlog of messages exceeds the high water mark of the Channel, the Channel will be set to the "unwritable" state to prevent memory overflow. After these two steps are completed, the message will be added to the output buffer ChannelOutboundBuffer.

@Override public final void write (Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this .outboundBuffer; if (outboundBuffer == null ) { //outboundBuffer will be created along with the Channel, and it will generally not be null. Check it here. try { ReferenceCountUtil.release(msg); } finally { safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)" )); } return ; } int size; try { //Filter out the message, make sure it is ByteBuf or FileRegion, other objects do not support writing out. msg = filterOutboundMessage(msg); /* Estimate the memory occupied by the message, function: Because write() will not write the message to the Socket, it will be temporarily stored in memory until flush(). In order to prevent messages from accumulating, Netty will set high and low water levels. When the total amount of temporary messages reaches the highest water level, the Channel will be set to an unwritable state. To protect your program and avoid memory overflow. See: io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size() For FileRegion, it will directly return 0, because the zero-copy technology is used and there is no need to read the file to the JVM process. */ size = pipeline.estimatorHandle().size(msg); if (size < 0 ) { size = 0 ; } } catch (Throwable t) { try { ReferenceCountUtil.release(msg); } finally { safeSetFailure(promise, t); } return ; } //write() will only temporarily store the message in the outboundBuffer, and will not actually send it. outboundBuffer.addMessage(msg, size, promise); } Copy code

Pay attention

filterOutboundMessage()
In addition to filtering messages, it will also try to convert HeapByteBuf to DirectByteBuf. In order to improve the efficiency of data transmission, Netty uses direct memory for data read and written directly from Socket to avoid memory copying in IO operations.

//Filter outbound messages, and only support writing ByteBuf and FileRegion. @Override protected final Object filterOutboundMessage (Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } //In order to avoid memory copying, all data directly read and written by Socket use off-heap memory return newDirectBuffer(buf); } //File transfer if (msg instanceof FileRegion) { return msg; } // Unsupported data type, throw an exception throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } Copy code

newDirectBuffer()
There is no guarantee that the conversion will be successful, if the ByteBufAllocator used is not pooled and not turned on
io.netty.threadLocalDirectBufferSize
, Then it means that Netty needs to apply for a DirectByteBuf that is not pooled. This operation is very expensive, and Netty will give up the conversion:

//Attempt to convert HeapByteBuf to DirectByteBuf, if the conversion overhead is high, it will give up. protected final ByteBuf newDirectBuffer (ByteBuf buf) { final int readableBytes = buf.readableBytes(); if (readableBytes == 0 ) { //If the number of readable bytes is 0, directly release and return the shared empty object. ReferenceCountUtil.safeRelease(buf); return Unpooled.EMPTY_BUFFER; } //Get the ByteBufAllocator bound to the Channel final ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { //Is the allocator pooled and can allocate direct memory? //Create a direct memory ByteBuf of a specified size, write the data, and release the original buf ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } /* If io.netty.threadLocalDirectBufferSize is set, Netty will implement a lightweight through Stack in the FastThreadLocal of the thread ByteBuf object pool. After ByteBuf is written to Socket, it will be automatically released. Here, it will be pushed to the thread-bound Stack for reuse. */ final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); if (directBuf != null ) { directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } //The cost of applying for an unpooled direct memory ByteBuf is very large. After testing, it is more than 10 times slower than the application of heap memory, so I will give up directly here. return buf; } Copy code

If set

io.netty.threadLocalDirectBufferSize
, Netty will create a specified number of ByteBuf object caches for each thread, and these ByteBufs can be reused. The implementation logic is that Netty will store a Stack in FastThreadLocal, when needed
pop()
One comes out, when it runs out
push()
return.

Pay attention again

MessageSizeEstimator
, It is responsible for calculating the memory occupied by the data to be sent. The logic is very simple. For FileRegion, it will return 0, because FileRegion uses zero-copy technology when transferring files. It uses mmap memory mapping directly without loading the file into the JVM process. Look
io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size()
:

//To estimate the memory usage of the message, the logic is still very simple. @Override public int size (Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } //FileRegion implements zero copy and does not need to load the file into the JVM, so the memory occupied is 0, which does not affect the Channel water mark. if (msg instanceof FileRegion) { return 0 ; } return unknownSize; } Copy code

The ChannelOutboundBuffer code will be analyzed in detail in the next section. You only need to know that write() will only temporarily store the data in the ChannelOutboundBuffer, and not actually send it.

The message is stored in the ChannelOutboundBuffer, and the write operation is complete. Will call immediately

invokeFlush0()
, It will still be forwarded to Unsafe for execution, call
AbstractChannel.AbstractUnsafe.flush()
. It will do two things: first mark the Entry to be sent in the ChannelOutboundBuffer as
flushed
, And then convert the Entry data to be sent into Java ByteBuffer, and use SocketChannel for real data transmission.

@Override public final void flush () { assertEventLoop(); //Get the ChannelOutboundBuffer bound to the SocketChannel ChannelOutboundBuffer outboundBuffer = this .outboundBuffer; if (outboundBuffer == null ) { return ; } //First mark the unflushed node as flushed outboundBuffer.addFlush(); //start sending data flush0(); } Copy code

flush0()
It will start to send data. It will first check whether the Channel is active. If it is inactive, the flush() operation will fail and the Entry will be removed. If the Channel is normal, it will call
doWrite()
Send data.

protected void flush0 () { if (inFlush0) {//Avoid triggering a return when flush0() has not been executed last time ; } final ChannelOutboundBuffer outboundBuffer = this .outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { //non-empty check return ; } inFlush0 = true ; //If the connection has been deactivated. if (!isActive()) { try { if (!outboundBuffer.isEmpty()) { if (isOpen()) { /* The channel is open and may be activated later. 1. Release msg 2. Trigger failure notification 3. Recycling Entry 4. Decrement the number of bytes pending message */ outboundBuffer.failFlushed( new NotYetConnectedException(), true ); } else { /* The channels are closed, which is similar to the process above, except that the channelWritabilityChanged() callback is not triggered. */ outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()" ), false ); } } } finally { inFlush0 = false ; } return ; } try { //The connection is normal, and the real write() operation is performed doWrite(outboundBuffer); } catch (Throwable t) { handleWriteError(t); } finally { inFlush0 = false ; } } Copy code

doWrite()
It is the core of data transmission, which is implemented by subclasses, look directly here
NioSocketChannel.doWrite()
. It will obtain the Java native SocketChannel, convert the ByteBuf to be sent in the queue into a ByteBuffer, and then send the data in a loop. The amount of data sent in a single cycle is limited by the following two conditions:

  1. The number of ByteBuffers is limited.
  2. The size limit of the buffer set by TCP parameters (ChannelOption.SO_SNDBUF).

If the ChannelOutboundBuffer has a large backlog of data, it may not be sent in a single time, so it will be sent in a loop 16 times by default. Too many cycles may block the IO thread, causing other Channel events to not be processed.

@Override protected void doWrite (ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); //The number of cycles to write, the default is 16 times. There may be a large amount of messages backlogged in the output buffer. At the same time, in order to avoid blocking the IO thread, the number of times is limited. int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { //There is no data to write out //Cancel listening to OP_WRITE event clearOpWrite(); return ; } //The maximum value of the sending buffer is set by TCP parameter: ChannelOption.SO_SNDBUF. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); /* The Entry to be flushed is converted into a Java native ByteBuffer array. Because of the total number and total number of bytes, it may not be possible to send all data at one time. Note that only ByteBuf will be processed here, and FileRegion will not be processed. */ ByteBuffer[] nioBuffers = in.nioBuffers( 1024 , maxBytesPerGatheringWrite); //Actually it is nioBuffers.length, which will be set in the previous step method int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0 : //ByteBuf has been processed, but there may be FileRegion to be processed. writeSpinCount -= doWrite0(in); break ; case 1 : { //Only a single ByteBuf needs to be sent ByteBuffer buffer = nioBuffers[ 0 ]; //The number of bytes attempted to send int attemptedBytes = buffer.remaining(); //Java native SocketChannel.wrote(ByteBuffer ) Send data final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0 ) { //TCP buffer is full, subscribe to OP_WRITE event, wait for write to continue processing incompleteWrite( true ); return ; } //Dynamically adjust the size of the sending buffer adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); //Delete the Entry node that has been sent in.removeBytes(localWrittenBytes); --writeSpinCount; break ; } default : { //There are multiple ByteBufs to be sent in the sending buffer //The total number of bytes attempted to send long attemptedBytes = in.nioBufferSize(); //Call Java's native SocketChannel.write() to send data, and the return value is The actual number of bytes sent final long localWrittenBytes = ch.write(nioBuffers, 0 , nioBufferCnt); if (localWrittenBytes <= 0 ) { //The write byte is 0, maybe the TCP buffer is full, subscribe to OP_WRITE event, Wait for TCP to be writable before executing. incompleteWrite( true ); return ; } //According to the actual number of bytes written this time, dynamically adjust the sending buffer: ChannelOption.SO_SNDBUF adjustMaxBytesPerGatheringWrite(( int ) attemptedBytes, ( int ) localWrittenBytes, maxBytesPerGatheringWrite); /* Deleting the sent data is based on the number of bytes actually written, not based on the number of ByteBuf. Starting from flushedEntry, calculate the size of each ByteBuf and delete them one by one. There may be a ByteBuf sending part of the data, and its readerIndex will be adjusted. */ in.removeBytes(localWrittenBytes); --writeSpinCount; break ; } } } while (writeSpinCount> 0 ); /* Only when nioBufferCnt is processed, doWrite0(in) is called to process FileRegion, and the processing is not completed, it is possible to come here. If the FileRegion has not been processed, writeSpinCount will be less than 0, and here will continue to subscribe to the OP_WRITE event, and continue processing when the Channel is writable. */ incompleteWrite(writeSpinCount < 0 ); } Copy code

In addition,

NioSocketChannel.doWrite()
Only ByteBuf will be sent, and the sending of FileRegion needs to call the parent class
AbstractNioByteChannel.doWrite0()
deal with.

/* NioSocketChannel is only responsible for sending ByteBuf, The sending of FileRegion will be handled here. */ protected final int doWrite0 (ChannelOutboundBuffer in) throws Exception { Object msg = in.current(); if (msg == null ) { //Directly return here so incompleteWrite(...) is not called. return 0 ; } //Data transmission return doWriteInternal(in, in.current()); } private int doWriteInternal (ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { //There is no data to read, delete the node directly in.remove(); return 0 ; } //Java underlying write() final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount> 0 ) { in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1 ; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; //The number of bytes that have been transferred >= the total number of bytes, which means that the file has been transferred and the node is deleted. if (region.transferred() >= region.count()) { in.remove(); return 0 ; } //call region.transferTo(javaChannel(), position) file transfer long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount> 0 ) { //actual number of bytes sent in.progress(localFlushedAmount); if (region.transferred() >= region.count()) { //FileRegion is sent, remove the node in.remove(); } return 1 ; } } else { //Should not reach here. throw new Error(); } /* Generally, I don t come here. It may be that the data has not been processed and an Integer.MAX_VALUE is returned. Let writeSpinCount be less than 0, so that it will subscribe to the OP_WRITE event and continue processing when the Channel is writable. */ return WRITE_STATUS_SNDBUF_FULL; } Copy code

have to be aware of is,

flush()
There may be two situations for the operation:

  1. The data is sent normally.
  2. The data has not been sent, and the maximum number of cycles has been exceeded. In order not to block the IO thread, it will be processed next time.
  3. The TCP buffer is full and data cannot be sent.

For the latter two cases, it belongs to "incomplete write", so it will call

incompleteWrite(setOpWrite)
Continue processing later. For the third case, Netty needs to subscribe
OP_WRITE
Event, wait
Selector
Notify the Channel to continue sending data when it is writable.
setOpWrite
The parameter represents whether to monitor
OP_WRITE
event:

/** * Incomplete writing * @param setOpWrite whether to subscribe to OP_WRITE event */ protected final void incompleteWrite ( boolean setOpWrite) { //setOpWrite is true, usually the TCP buffer is full. At this time, you need to subscribe to the OP_WRITE event and wait for the Channel to be writable before continuing processing. if (setOpWrite) { //Subscribe to OP_WRITE event setOpWrite(); } else { //Unsubscribe OP_WRITE event clearOpWrite(); //Submit a flush task and execute it later to avoid blocking the IO thread. eventLoop().execute(flushTask); } } Copy code

So far,

writeAndFlush()
The whole process is finished. For ChannelOutboundBuffer, this section has not been analyzed, see the next section.

ChannelOutboundBuffer source code analysis

ChannelOutboundBuffer is Netty's data sending buffer, which is created along with SocketChannel.

First look at the attributes:

/* When packaging ByteBuf into Entry, the additional byte size occupied, because in addition to the data of ByteBuf itself, the Entry object also occupies space. Why is it 96? Why do you still support modification? ? 1.96 is the maximum value calculated by Netty based on the 64-bit JVM. 2. If your program is running in a 32-bit JVM, or object references have compression enabled, you can modify this value according to the actual situation. The analysis will take up to 96 bytes: In a 64-bit JVM, an Entry object occupies the following space: -16 bytes of object header space -6 object reference attributes, minimum 4*6=24 bytes, maximum 8*6=48 bytes -2 long attributes, 2*8=16 bytes -2 int attributes, 2*4=8 bytes -1 boolean attribute, 1 byte -Padding is aligned and filled, the JVM requires the memory occupied by the object to be an integer multiple of 8 bytes, here is 7 bytes The total is up to 96 bytes. */ static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = SystemPropertyUtil.getInt( "io.netty.transport.outboundBufferEntrySizeOverhead" , 96 ); /* When sending data, you need to convert the ByteBuf to be sent into ByteBuffer, considering that write is a very frequent operation. In order to avoid frequent array creation, multiplexing is performed here, and each thread will reuse its own ByteBuffer[]. */ private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() throws Exception { //The default size is 1024, and it will be expanded if necessary. return new ByteBuffer[ 1024 ]; } }; //bound SocketChannel private final Channel channel; //The head node that has been flushed and is waiting to be sent. private Entry flushedEntry; //The head node that has been written but has not been flushed , will pass through it to find private Entry unflushedEntry when flush() ; //chain tail node private Entry tailEntry; //The number of flushed nodes. When sending data, flushed nodes will be found from the beginning of the flushedEntry to the back. private int flushed; //Number of Nio Buffers written in a single cycle private int nioBufferCount; //Total size of Nio Buffers written in a single cycle private long nioBufferSize; //Whether flush failed private boolean inFail; //Calculate the offset of the totalPendingSize property and modify it through CAS. private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize" ); //The total memory occupied by the messages temporarily stored in the output buffer. This value is used to determine whether the high or low water level is reached to modify the writable state of the Channel. @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize; //The offset of the unwritable attribute is modified by CAS. private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable" ); //Whether the channel can be written, 0 can be written, and 1 cannot be written. Modified when the output buffer memory reaches the high and low water mark. @SuppressWarnings("UnusedDeclaration") private volatile int unwritable; //The task triggered when the channel's writable state changes, and the private volatile Runnable fireChannelWritabilityChangedTask is triggered when the backlog of messages reaches the high and low water mark; copy the code

It itself is a singly linked list consisting of a series of Entry nodes. It has three node pointers:

  • flushedEntry: The starting node pointer that has been flushed and is waiting to be sent.
  • unflushedEntry: The pointer of the starting node that has been written and waiting to be flushed.
  • tailEntry: pointer to the end of the chain.

The author took a simple diagram to show how it works: As mentioned in the previous section, the implementation

flush(msg)
During operation, just temporarily store the data in ChannelOutboundBuffer, the core method is
addMessage()
, It mainly does two things:

  1. Encapsulate msg as an Entry node and add it to the end of the chain.
  2. Count whether the total number of message bytes in the output buffer reaches the high water mark. If it reaches the high water mark, set the Channel to the "unwritable" state and trigger
    ChannelWritabilityChanged
    Callback.
/** * Temporarily store the message in the ChannelOutboundBuffer, and you will receive a notification if the temporary storage is successful. * @param msg Data to be sent: ByteBuf/FileRegion * @param size The memory size occupied by the data * @param promise write success will be notified */ public void addMessage (Object msg, int size, ChannelPromise promise) { //Wrap msg into an Entry and add it to the end of the chain. Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null ) { flushedEntry = null ; } else { //tail is not empty, then add to its next Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; //tailEntry points to the newly added node if (unflushedEntry == null ) { unflushedEntry = entry; } //Count the number of pending bytes in the message. If it exceeds the high water mark, you need to modify the channel's writable state and trigger the callback incrementPendingOutboundBytes(entry.pendingSize, false ); } Copy code

Look first

Entry.newInstance()
, It will encapsulate msg into Entry node and add it to the end of the chain. Entry has an attribute
pendingSize
It is used to record the memory space occupied by the message. It should be noted that in addition to the data space of the msg itself, it also adds the space occupied by the Entry object. How much space a Java object occupies is determined at compile time, except for attributes occupied In addition to the space, readers also need to understand the memory layout of Java objects.

/** * Create an Entry node and take one from the object pool * @param msg message itself * @param size Estimated by MessageSizeEstimator the memory size occupied by the message * @param total The size of the message itself (the difference is the processing of FileRegion) * @param promise write() will be notified after completion * @return */ static Entry newInstance (Object msg, int size, long total, ChannelPromise promise) { /* Each write() requires an Entry, considering that write() is a very frequent operation, In order to avoid frequent creation and destruction of Entry, the object pool is reused here. */ Entry entry = RECYCLER.get(); entry.msg = msg; /* Why add CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD to the occupied memory? ? ? In addition to the space occupied by ByteBuf, Entry itself also takes up space. In a 64-bit JVM: -16 bytes of object header space -6 object reference attributes, minimum 4*6=24 bytes, maximum 8*6=48 bytes -2 long attributes, 2*8=16 bytes -2 int attributes, 2*4=8 bytes -1 boolean attribute, 1 byte -Padding is aligned and filled, the JVM requires the memory occupied by the object to be an integer multiple of 8 bytes, here is 7 bytes The total is up to 96 bytes, so the default value of CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD is even 96. */ entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD; entry.total = total; entry.promise = promise; return entry; } Copy code

After Entry is added to the linked list,

incrementPendingOutboundBytes()
The total number of bytes will be accumulated to determine whether it exceeds the high water mark:

/** * Count the number of pending bytes in the message. If it exceeds the high water mark, you need to modify the writable status of the Channel and trigger the callback * @param size The number of bytes of memory occupied by the message * @param invokeLater whether to trigger and call back later */ private void incrementPendingOutboundBytes ( long size, boolean invokeLater) { if (size == 0 ) { return ; } // Calculate the temporary storage memory size of messages, and accumulate them in CAS way long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet( this , size); //Temporarily stored messages reach the high water mark, and the message backlog is overstocked if (newWriteBufferSize> channel.config(). getWriteBufferHighWaterMark()) { //Modify the writable state and trigger the callback setUnwritable(invokeLater); } } Copy code

setUnwritable()
Will trigger when the total number of data bytes exceeds the high water mark, it will spin +CAS
unwritable
Change from 0 to 1, and then trigger the callback:

//Set Channel as unwritable , CAS executes private void setUnwritable ( boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1 ; if (UNWRITABLE_UPDATER.compareAndSet( this , oldValue, newValue )) { if (oldValue == 0 ) { //CAS operation is successful, trigger callback fireChannelWritabilityChanged(invokeLater); } break ; } } } Copy code

That's all the work done by ChannelOutboundBuffer for write operations, let's look at flush below.

When the flush operation is performed, the first call

outboundBuffer.addFlush()
will
unflushed
The node is marked as
flushed
, Which is actually mobile
flushedEntry
with
unflushedEntry
Pointer, this process will check whether the Entry node is cancelled, if it is cancelled, the node will be skipped, and the memory space occupied by the Entry will be decremented.

//Only the node is marked as flushed, and no data is actually sent, and the node that has been cancelled will be skipped. public void addFlush () { Entry entry = unflushedEntry; if (entry != null ) { if (flushedEntry == null ) { flushedEntry = entry; } do { flushed ++; //Set the entry's promise to an uncancellable state if (!entry.promise.setUncancellable()) { //The setting fails, indicating that the promise has been cancelled, the message needs to be released, and the number of pending bytes is decremented int pending = entry. cancel(); //Decrement the total number of message bytes in the buffer. If it reaches the low water mark, reset the Channel to the "writable" state and trigger the callback decrementPendingOutboundBytes(pending, false , true ); } entry = entry.next; } while (entry != null ); //keep looking back for nodes to be flushed //All nodes are flushed , set to empty unflushedEntry = null ; } } Copy code

After the node status is marked, it will call

doWrite()
Start writing data. First of all it needs ChannelOutboundBuffer to
flushed
The node is converted into Java native ByteBuffer, the method is
nioBuffers()
. Because the OS is right
SocketChannel.write()
There is a limit to the number of bytes sent at a time, generally
Integer.MAX_VALUE
, So a single conversion needs to provide two parameters:

  • maxCount: The maximum number of converted ByteBuffers, the default is 1024.
  • maxBytes: The maximum number of bytes, the default is the set TCP send buffer size (ChannelOption.SO_SNDBUF).
/** * Convert the entry to be flushed into a Java native ByteBuffer array, because the total number and total number of bytes are restricted, so it may not be possible to send all the data at one time. * @param maxCount The maximum number of ByteBuffer sent in a single time * @param maxBytes The maximum value of sending buffer, which is set by TCP parameter: ChannelOption.SO_SNDBUF. * @return */ public ByteBuffer[] nioBuffers( int maxCount, long maxBytes) { assert maxCount> 0 ; assert maxBytes> 0 ; long nioBufferSize = 0 ; int nioBufferCount = 0 ; //Since the write operation is very frequent, avoid ByteBuffer[] Frequent creation and destruction, reused here, each thread has a ByteBuffer[1024] final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; //Make sure that the Entry node is flushed and msg is of type ByteBuf while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { //Make sure that the node is not cancelled, if it is cancelled, skip it. if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex()-readerIndex; //The number of readable bytes is the number of bytes to be written, make sure it is greater than 0 if (readableBytes> 0 ) { if (maxBytes-readableBytes <nioBufferSize && nioBufferCount != 0 ) { //The sent data exceeds maxBytes, exit Loop break ; } nioBufferSize += readableBytes; int count = entry.count; if (count ==- 1 ) { //-1 means the nioBufferCount of ByteBuf is not set, and the number of ByteBuffer in ByteBuf entry.count = count = buf.nioBufferCount(); } //Do you need more space int neededSpace = min(maxCount, nioBufferCount + count); //If the number of ByteBuffers exceeds the default value of 1024, apply for more space if (neededSpace> nioBuffers.length) { //Multiply the capacity until the length of the array is sufficient. And plug it back to FastThreadLocal. nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1 ) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null ) { //Convert ByteBuf to ByteBuffer and cache it in Entry entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } //Set ByteBuffer nioBuffers[nioBufferCount++] = nioBuf; } else { //One ByteBuf contains multiple ByteBuffer cases, loop through the settings nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount); } if (nioBufferCount >= maxCount) { //The number of ByteBuffer exceeds maxCount, exit the loop break ; } } } entry = entry.next; } this .nioBufferCount = nioBufferCount; this .nioBufferSize = nioBufferSize; return nioBuffers; } Copy code

Follow it here

NIO_BUFFERS
Property, it is a FastThreadLocal, each thread has its own ByteBuffer[] buffer, the default length is 1024, which can be reused. Why is it reused here? Because as a network IO framework, flush must be a very frequent operation. In order to avoid creating ByteBuffer[] every time, reuse can improve system performance and reduce GC pressure.

If a ByteBuf is composed of many ByteBuffers, the default 1024 ByteBuffer may not be enough, and it will call

expandNioBufferArray()
To expand:

//expand the array private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { int newCapacity = array.length; do { //double expansion newCapacity <<= 1 ; if (newCapacity < 0 ) { //int overflow throw new IllegalStateException(); } } while (neededSpace> newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; //Element migration System.arraycopy(array, 0 , newArray, 0 , size); return newArray; } Copy code

After converting the ByteBuf to be sent into ByteBuffer, NioSocketChannel will call the SocketChannel.write() at the bottom of the JDK to send the real data.

After the data is sent, the nodes in the ChannelOutboundBuffer need to be removed. The addition of nodes starts from the end of the chain, and the removal starts from the head of the chain. ChannelOutboundBuffer removes nodes based on the actual number of bytes sent. Therefore, there may be cases where a ByteBuf only sends part of the data. If a ByteBuf data is not sent, the node will not be removed, only Adjust it

readerIndex
Index, and continue to send the remaining data next time.

/** * Remove ByteBuf based on the number of bytes written to the TCP buffer. * @param writtenBytes */ public void removeBytes ( long writtenBytes) { for (;;) { //Calculate from flushedEntry Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0 ; break ; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex()-readerIndex; //If the data of a single ByteBuf <= writtenBytes, remove the Entry node directly if ( readableBytes <= writtenBytes) { if (writtenBytes != 0 ) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); } else { //readableBytes> writtenBytes //There is a ByteBuf sending part of the data, adjust its readerIndex, and continue sending next time. if (writtenBytes != 0 ) { buf.readerIndex(readerIndex + ( int ) writtenBytes); progress(writtenBytes); } break ; } } //Reset NIO_BUFFERS clearNioBuffers(); } Copy code

At this point, Netty's data transmission core process is all analyzed.

summary

In order to avoid writing data into the TCP buffer every write, Netty's Channel provides two operations: write and flush, which need to rely on a core class ChannelOutboundBuffer. Write only temporarily stores data in the buffer, and flush is sending data. At the same time, in order to avoid too much message backlog leading to OOM, Netty provides a high and low water mark. When the temporarily stored message reaches the high water mark, Netty will set the Channel to "unwritable" and trigger a callback. The user can judge whether it is based on the status Need to continue writing messages.

ChannelOutboundBuffer itself is a one-way linked list, which is responsible for managing temporarily stored messages. When data needs to be sent, it is also responsible for converting ByteBuf into ByteBuffer, because the SocketChannel at the bottom of the JDK only supports writing to ByteBuffer.

After the data is sent, ChannelOutboundBuffer is also responsible for removing the Entry node according to the actual number of bytes sent, because a certain ByteBuf only sends part of the data. For this special case, ChannelOutboundBuffer will not remove the node. Is to adjust it

readerIndex
Index, and continue to send the remaining data next time.