本文主要研究一下storm client的netty buffer watermark
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/**
* Netty based messaging: The netty write buffer high watermark in bytes.
* <p>
* If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()} will start to
* return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK
* low water mark}.
* </p>
*/
@isInteger
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark";
/**
* Netty based messaging: The netty write buffer low watermark in bytes.
* <p>
* Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water
* mark} and then dropped down below this value, the netty {@code Channel.isWritable()} will start to return true.
* </p>
*/
@isInteger
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
storm-2.0.0/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
int port) {
this.topoConf = topoConf;
closing = false;
this.scheduler = scheduler;
int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
// if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
host, port, bufferSize, lowWatermark, highWatermark);
int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1);
// Initiate connection to remote destination
this.eventLoopGroup = eventLoopGroup;
// Initiate connection to remote destination
bootstrap = new Bootstrap()
.group(this.eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, bufferSize)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
launchChannelAliveThread();
scheduleConnect(NO_DELAY_MS);
int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
batcher = new MessageBuffer(messageBatchSize);
String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
if (clazz == null) {
waitStrategy = new WaitStrategyProgressive();
} else {
waitStrategy = ReflectionUtils.newInstance(clazz);
}
waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
}
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/WriteBufferWaterMark.java
/**
* WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer.
* <p>
* If the number of bytes queued in the write buffer exceeds the
* {@linkplain #high high water mark}, {@link Channel#isWritable()}
* will start to return {@code false}.
* <p>
* If the number of bytes queued in the write buffer exceeds the
* {@linkplain #high high water mark} and then
* dropped down below the {@linkplain #low low water mark},
* {@link Channel#isWritable()} will start to return
* {@code true} again.
*/
public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
public static final WriteBufferWaterMark DEFAULT =
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
private final int low;
private final int high;
/**
* Create a new instance.
*
* @param low low water mark for write buffer.
* @param high high water mark for write buffer
*/
public WriteBufferWaterMark(int low, int high) {
this(low, high, true);
}
/**
* This constructor is needed to keep backward-compatibility.
*/
WriteBufferWaterMark(int low, int high, boolean validate) {
if (validate) {
if (low < 0) {
throw new IllegalArgumentException("write buffer's low water mark must be >= 0");
}
if (high < low) {
throw new IllegalArgumentException(
"write buffer's high water mark cannot be less than " +
" low water mark (" + low + "): " +
high);
}
}
this.low = low;
this.high = high;
}
/**
* Returns the low water mark for the write buffer.
*/
public int low() {
return low;
}
/**
* Returns the high water mark for the write buffer.
*/
public int high() {
return high;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(55)
.append("WriteBufferWaterMark(low: ")
.append(low)
.append(", high: ")
.append(high)
.append(")");
return builder.toString();
}
}
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
private volatile int unwritable;
/**
* Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
* not exceed the write watermark of the {@link Channel} and
* no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
* {@code false}.
*/
public boolean isWritable() {
return unwritable == 0;
}
/**
* Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
*/
public long bytesBeforeWritable() {
long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
// If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
// Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
// together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? 0 : bytes;
}
return 0;
}
/**
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(long size) {
decrementPendingOutboundBytes(size, true, true);
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
private volatile int unwritable;
/**
* Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
* not exceed the write watermark of the {@link Channel} and
* no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
* {@code false}.
*/
public boolean isWritable() {
return unwritable == 0;
}
/**
* Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
*/
public long bytesBeforeUnwritable() {
long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
// If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
// Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
// together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? bytes : 0;
}
return 0;
}
/**
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(long size) {
incrementPendingOutboundBytes(size, true);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
默认16M
)以及storm.messaging.netty.buffer.low.watermark(默认8M
)其实配置的是netty的ChannelOption.WRITE_BUFFER_WATER_MARK目前来看这两个方法貌似调用的比较少
)目前应该是这两个方法起作用
),当小于lowWatermark或者大于highWatermark的时候,分别触发setWritable及setUnwritable,更改ChannelOutboundBuffer的unwritable字段,进而影响isWritable方法;在isWritable为true的时候会立马执行写请求,当返回false的时候,写请求会被放入队列等待isWritable为true时才能执行这些堆积的写请求原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。