-
Notifications
You must be signed in to change notification settings - Fork 5
Description
First, I was having issues with the OutputToReadStream.wrap( input ) method, I was trying to perform a proper close on the stream. But you can't close the output stream until the operation is complete. Therefore, the close() must be called on the future from pipeTo(...) inside onComplete( x-> {} ).
First, is there a cleaner way to close the output stream? You can't use the try-resource block because it closes the output too soon.
final OutputToReadStream os = new OutputToReadStream( vertx );
final Future<Void> pipeTo = os.wrap( image ).onComplete( x -> {
try {
os.close();
}
catch( Exception e ) {
e.printStackTrace();
}
} );The issue with this code is that onComplete is executed back on a vertx loop thread! And because close() makes an await call, this is bad. That means extra care must be taken to call close() on a different thread, like this...
final OutputToReadStream os = new OutputToReadStream( vertx );
final Future<Void> pipeTo = os.wrap( image ).pipeTo( ctx.response() )
.onComplete( x -> {
// must FJP close because onComplete is vertx loop thread!
ForkJoinPool.commonPool().submit( () -> {
try {
os.close();
}
catch( Exception e ) {
e.printStackTrace();
}
} );
} );*** But there's an underlying bug in the push( null ) called by close(). I think it was exasperated by the use another thread. It's throwing a NPE on this line...
if( data == null )
endHandler.handle( null ); <--- NPE
else
dataHandler.handle( data );endHandler is null. The endHandler property is only set by one method endHandler(...) and it appears there is a call to it from the .pipeTo(...) method. The PipeImpl constructor is setting it to null. This contradicts your expected empty lambda default of x->{}
I put a null check before the call to .handle( null ), but there may be other places that need this too. Or, you can reject nulls, but that will break the PipeImpl.
I chose to replace any null settings with the empty lambda x->{} so as to keep with your initialization assumptions. This code works for me...
@Override
public OutputToReadStream endHandler( Handler<Void> endHandler ) {
this.endHandler = endHandler == null ? x -> { } : endHandler;
return this;
}