Recently upgraded from micronaut 3.3.4 to 3.8.7, also updated all rxjava2 to rxjava3 and now can no longer use the
Micronaut Declarative Client with Flowable.blocking* calls against a reactive controller.
The same functionality worked for months in production; now it is not possible to stream from the endpoint. Previously
the http.client.read-timeout and read-idle-timeout variables were set to their default values
This is a Demo project to show change in behavior of the reactive streams when upgrading to micronaut-3.8.8 and rxjava3.
This project shows that i can no longer use the Flowable.blockingNext() or Flowable.blockingIterable() to stream
results from a reactive endpoint that uses a Flowable and @ExcecuteOn annotation to offload the work
The client would return a stream of IngestReport objects and the app using the client could step through the stream
A ReadTimeoutException is thrown while the client waits for the Flowable to begin because the endpoint does some
pre-processing that takes up to 60 seconds. Thus the ReadTimeoutException is thrown after 10 seconds.
In micronaut-3.3.4 (plugin version 3.3.2) and rxjava2 it was possible to do the following:
Declare a Flowable method in a parent inerface, and then extend this interface in a declarative client
interface LoanOperations {
@Get(uri = '/ingest/tracker/{loanTapeId}/schema/{schemaId}', processes = MediaType.APPLICATION_JSON_STREAM)
Flowable<IngestReport> ingestTracker(@NonNull Long loanTapeId, @NonNull Long schemaId)
}
@Client(id='loan', path='${loan.context-path:/}${loan.api.version}/loan')
interface LoanClient extends LoanOperations{}Implement the method and execute the call on the TaskExecutors.IO thread because pre-processing causes this Flowable
to not start emitting immediately
@Controller('/${loan.api.version}/loan')
class LoanController{
@ExecuteOn(TaskExecutors.IO)
Flowable<IngestReport> ingestTracker(@NonNull Long loanTapeId, @NonNull Long schemaId) {
//pre-processing that takes up to a minute
//start emitting the IngestReport objects
}
}i used to be able to call the Flowable.blockingIterable() from an app and get an iterator and the code would happily wait
(without setting the http.client.read-timeout or changing the read-idle-timeout) and the following would just work:
//either of these would work
//def iterator = loanClient.ingestTracker(persistedTape.id, persistedSchema.id).blockingNext().iterator()
def iterator = loanClient.ingestTracker(persistedTape.id, persistedSchema.id).blockingIterable().iterator()
def report = null
while( iterator.hasNext() ){
report = iterator.next()
if( report.state.is IngestReport.State.COMPLETE ){
log.info "COMPLETE!"
}
}