Remove our packet queuing as it may be contributing to high CPU usage and/or memory leaks.
This commit is contained in:
parent
47839cb11c
commit
526137be7b
@ -15,35 +15,17 @@ public class ChannelWrapper
|
|||||||
private final Channel ch;
|
private final Channel ch;
|
||||||
@Getter
|
@Getter
|
||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
private final MessageList<Object> queue = MessageList.newInstance();
|
|
||||||
private volatile boolean flushNow = true;
|
|
||||||
|
|
||||||
public ChannelWrapper(ChannelHandlerContext ctx)
|
public ChannelWrapper(ChannelHandlerContext ctx)
|
||||||
{
|
{
|
||||||
this.ch = ctx.channel();
|
this.ch = ctx.channel();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void flushNow(boolean flush)
|
|
||||||
{
|
|
||||||
if ( !flushNow && flush )
|
|
||||||
{
|
|
||||||
ch.write( queue.copy() );
|
|
||||||
queue.clear();
|
|
||||||
}
|
|
||||||
this.flushNow = flush;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void write(Object packet)
|
public synchronized void write(Object packet)
|
||||||
{
|
{
|
||||||
if ( !closed )
|
if ( !closed )
|
||||||
{
|
|
||||||
if ( flushNow )
|
|
||||||
{
|
{
|
||||||
ch.write( packet );
|
ch.write( packet );
|
||||||
} else
|
|
||||||
{
|
|
||||||
queue.add( packet );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,24 +34,14 @@ public class ChannelWrapper
|
|||||||
if ( !closed )
|
if ( !closed )
|
||||||
{
|
{
|
||||||
closed = true;
|
closed = true;
|
||||||
ch.write( queue, ch.newPromise() ).addListener( new ChannelFutureListener()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void operationComplete(ChannelFuture future) throws Exception
|
|
||||||
{
|
|
||||||
ch.close();
|
ch.close();
|
||||||
}
|
}
|
||||||
} );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addBefore(String baseName, String name, ChannelHandler handler)
|
public void addBefore(String baseName, String name, ChannelHandler handler)
|
||||||
{
|
{
|
||||||
Preconditions.checkState( ch.eventLoop().inEventLoop(), "cannot add handler outside of event loop" );
|
Preconditions.checkState( ch.eventLoop().inEventLoop(), "cannot add handler outside of event loop" );
|
||||||
boolean oldFlush = flushNow;
|
|
||||||
flushNow( true );
|
|
||||||
ch.pipeline().addBefore( baseName, name, handler );
|
ch.pipeline().addBefore( baseName, name, handler );
|
||||||
flushNow( oldFlush );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Channel getHandle()
|
public Channel getHandle()
|
||||||
|
@ -61,7 +61,6 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter
|
|||||||
@Override
|
@Override
|
||||||
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception
|
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception
|
||||||
{
|
{
|
||||||
channel.flushNow( false );
|
|
||||||
for ( Object msg : msgs )
|
for ( Object msg : msgs )
|
||||||
{
|
{
|
||||||
if ( handler != null && ctx.channel().isActive() )
|
if ( handler != null && ctx.channel().isActive() )
|
||||||
@ -86,7 +85,6 @@ public class HandlerBoss extends ChannelInboundHandlerAdapter
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
channel.flushNow( true );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user