Wednesday, February 22, 2017

... about the perils of using the 'first' operator in rxJava

RxJava is a wonderful tool to do computation logic and stream processing. However, it sometimes behaves in unexpected ways. One of these will be discussed below:


first and single operator

According to the javadoc, first turns an observable into one that only returns 1 element and then terminates.

There's a similar operator called single. The difference is that the latter expects the source observable to only emit 1 item:
So, use first when you don't care how many items the source observable will emit. Otherwise use single.

When abstractions leak

However, they behave different when you add doOnCompleted, doOnTerminate and doAfterTerminate to the mix. These operators allow you to do some side effects after the last item has been emitted.

The following code will print "onCompleted called":
   Observable.just(1)
        .doOnCompleted(() -> System.out.println("onCompleted called") )
        .single()
        .subscribe()


The next code won't:
   Observable.just(1)
        .doOnCompleted(() -> System.out.println("onCompleted called") )
        .first()
        .subscribe()


The next example will:
   Observable.just(1)
        .first()
        .doOnCompleted(() -> System.out.println("onCompleted called") )
        .subscribe()


So, you will get different behavior depending on where you put doOnCompleted and if you use single or first.

The reason why first behaves unexpectedly is because it unsubscribes from the source observable ones it receives the first element. As a consequence the source observable never has the opportunity to signal completion. As a result, an upstream doOnCompleted handler is never called while a downstream doOnCompleted handler is.

By contrast, single will wait until it receives the completion signal from the source observable allowing upstream doOnCompleted handlers to be called.

Greetings
Jan

Sunday, February 19, 2017

... about keeping your stacktraces small

When developing an application, we often rely on logfiles to have a sense of what's going on. When an error occurs, we make sure the error is logged together with the stacktrace of the thread. The stacktrace gives us valuable information to pinpoint the problem.

At least ... that's how it used to be, before we started passing functions around and building layer upon layer. 

Below is an example of a stacktrace thrown during the handling of a request in a web container using spring mvc and rxJava:
com.mycompany.error.ValidationException: Validation failed
        at com.mycompany.upload.web.UploadValidator.checkArgument(UploadValidator.java:103)
        at com.mycompany.upload.web.UploadValidator.checkExtension(UploadValidator.java:97)
        at com.mycompany.upload.web.UploadValidator.checkCsvExtension(UploadValidator.java:89)
        at com.mycompany.upload.web.UploadValidator.validateCsvFile(UploadValidator.java:46)
        at com.mycompany.upload.web.UploadValidator.validate(UploadValidator.java:38)
        at rx.internal.util.ActionObserver.onNext(ActionObserver.java:39)
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:96)
        at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
        at rx.Subscriber.setProducer(Subscriber.java:205)
        at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
        at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
        at rx.Observable.subscribe(Observable.java:10296)
        at rx.Observable.subscribe(Observable.java:10263)
        at rx.Observable.subscribe(Observable.java:10038)
        at com.mycompany.upload.web.UploadController.upload(UploadController.java:213)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:220)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:134)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:754)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:847)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:295)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)
        at org.springframework.web.multipart.support.MultipartFilter.doFilterInternal(MultipartFilter.java:122)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:215)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.preauth.AbstractPreAuthenticatedProcessingFilter.doFilter(AbstractPreAuthenticatedProcessingFilter.java:121)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214)
        at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177)
        at org.springframework.security.web.debug.DebugFilter.invokeWithWrappedRequest(DebugFilter.java:90)
        at org.springframework.security.web.debug.DebugFilter.doFilter(DebugFilter.java:77)
        at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
        at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:231)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:149)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:150)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:97)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:102)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:344)
        at org.apache.coyote.ajp.AjpProcessor.process(AjpProcessor.java:490)
        at org.apache.coyote.ajp.AjpProtocol$AjpConnectionHandler.process(AjpProtocol.java:422)
        at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:926)
        at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.mycompany.upload.web.UploadContext.class
        at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
        at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:190)
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:98)
        ... 136 common frames omitted

It's horrific! 

Most of it doesn't tell you anything about what's going wrong. Most of it are frames from spring security, spring mvc, rxJava , container it's running on and method invocations through reflection as shown below: 
com.mycompany.error.ValidationException: Validation failed
        at com.mycompany.upload.web.UploadValidator.checkArgument(UploadValidator.java:103)
        at com.mycompany.upload.web.UploadValidator.checkExtension(UploadValidator.java:97)
        at com.mycompany.upload.web.UploadValidator.checkCsvExtension(UploadValidator.java:89)
        at com.mycompany.upload.web.UploadValidator.validateCsvFile(UploadValidator.java:46)
        at com.mycompany.upload.web.UploadValidator.validate(UploadValidator.java:38)
        at rx.internal.util.ActionObserver.onNext(ActionObserver.java:39)
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:96)
        at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
        at rx.Subscriber.setProducer(Subscriber.java:205)
        at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
        at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
        at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.Observable.unsafeSubscribe(Observable.java:10200)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
        at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
        at rx.Observable.subscribe(Observable.java:10296)
        at rx.Observable.subscribe(Observable.java:10263)
        at rx.Observable.subscribe(Observable.java:10038)

        at com.mycompany.upload.web.UploadController.upload(UploadController.java:213)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:220)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:134)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)

        at javax.servlet.http.HttpServlet.service(HttpServlet.java:754)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:847)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:295)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)

        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)

        at org.springframework.web.multipart.support.MultipartFilter.doFilterInternal(MultipartFilter.java:122)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)

        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:215)

        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.preauth.AbstractPreAuthenticatedProcessingFilter.doFilter(AbstractPreAuthenticatedProcessingFilter.java:121)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)

        at org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)

        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)

        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214)
        at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177)
        at org.springframework.security.web.debug.DebugFilter.invokeWithWrappedRequest(DebugFilter.java:90)
        at org.springframework.security.web.debug.DebugFilter.doFilter(DebugFilter.java:77)

        at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
        at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)

        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)

        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)

        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:246)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:214)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:231)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:149)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:150)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:97)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:102)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:344)
        at org.apache.coyote.ajp.AjpProcessor.process(AjpProcessor.java:490)
        at org.apache.coyote.ajp.AjpProtocol$AjpConnectionHandler.process(AjpProtocol.java:422)
        at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:926)

        at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.mycompany.upload.web.UploadContext.class
        at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
        at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:190)
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:98)

        ... 136 common frames omitted
All parts marked in colour don't give me any clues about why something fails. They only obscur the real problem. 

Luckily there's a poorly documented feature in logback that allows you to filter out those frames matching a certain prefix. Here's an example of a logback configuration filtering out the frames I'm not interested in: 
<configuration scan="true">
   <contextName>myApp</contextName>

   <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <Pattern>%d{dd/MM/yyyy HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n%rootException{full
                ,rx
                ,sun.reflect
                ,java.lang.reflect
                ,org.apache.catalina
                ,org.apache.coyote
                ,org.apache.tomcat
                ,org.springframework.web.method
                ,org.springframework.web.filter
                ,org.springframework.security.web
                }</Pattern>
        </encoder>
    </appender>


    <root level="WARN">
        <appender-ref ref="CONSOLE"/>
    </root>

</configuration>
The horrific stacktrace from above is now reduced to the following:
com.mycompany.error.ValidationException: Validation failed         at com.mycompany.upload.web.UploadValidator.checkArgument(UploadValidator.java:103)         at com.mycompany.upload.web.UploadValidator.checkExtension(UploadValidator.java:97)         at com.mycompany.upload.web.UploadValidator.checkCsvExtension(UploadValidator.java:89)         at com.mycompany.upload.web.UploadValidator.validateCsvFile(UploadValidator.java:46)         at com.mycompany.upload.web.UploadValidator.validate(UploadValidator.java:38)         at com.mycompany.upload.web.UploadController.upload(UploadController.java:213)         at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)         at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)         at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)         at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)         at javax.servlet.http.HttpServlet.service(HttpServlet.java:754)         at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)         at javax.servlet.http.HttpServlet.service(HttpServlet.java:847)         at org.springframework.web.multipart.support.MultipartFilter.doFilterInternal(MultipartFilter.java:122)         at java.lang.Thread.run(Thread.java:745) Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.mycompany.upload.web.UploadContext.class         ... 136 common frames omitted
Much easier to read, much faster to diagnose. Greetings Jan

Monday, February 13, 2017

... about querying with backpressure

When querying the database you don't always know in advance how many data the query will return. This is a problem when you receive more data than can fit in heap memory.


Paging

One way to work around this is to use paging: first select the first million rows, then select the next million, etc... until all rows are queried. Your first query will look something like the following:

SELECT * FROM myTable OFFSET 0 ROWS FETCH NEXT 1000000 ROWS ONLY;

while the next one will look something like the following:

SELECT * FROM myTable OFFSET 1000000 ROWS FETCH NEXT 2000000 ROWS ONLY;

This is slow for 2 reasons:
  1. you need to make sure all subsequent calls use a consistent ordering. As a result you have to add an order by clause to your query where you order by your primary key. you could also use something like rowId in Oracle or something similar in other databases.
  2. paging basically runs the entire query up till the page you requested. The higher the page you request, the longer your query will take.


Observables

Another way to avoid blowing up your heap size is to return rxJava's Observable instead of a list with all your results. This way you're streaming the results from your query to the subscriber of your Observable but you never keep all results in memory. This has two advantages:
  1. you don't need a lot of memory to keep all the results in memory
  2. you don't have to wait until all results from the database are fetched. You can already start processing items the moment they are read from the database and passed to the subscriber.
I'm using spring-jdbc's JdbcTemplate or NamedParameterJdbcTemplate for querying the database. It has all sorts of methods for querying the database. The following code shows you how to return an Observable:


Loading code....

It takes a sql query and a RowMapper for converting the result to some type <T> and returns an Observable. The observable will trigger the call to the database for every subscription using a ResultSetExtractor. The ResultSetExtractor will iterate over the ResultSet and will pass the data for each row to the subscriber until there are no more rows.
Loading code....

I can call the observable, subscribe to it and handle each row:


Loading code....


This works great... until you start doing things on another thread and your subscriber can't keep up with the items that are being produced:
Loading code....

The result is a MissingBackpressureException that is thrown when the consumer can't keep up. 
The problem is we didn't take backpressure into account. This is needed the moment you're not doing everything in 1 thread (callstack blocking) and have a consumer that is slower than your producer (the results from  the db query).


Backpressure

The solution is to refactor the above code into something that does know how to handle backpressure. Luckily rxJava helps us a lot by abstracting away most of the difficulties using SyncOnSubscribe. The code is a bit more complicated because we let rxJava control the call. In return it needs to know how to keep state during the subscription so we need to tell it ...
  1. how to create a new state object
  2. what to do when a new item is requested
  3. how to clean up state
To start, we need to wrap the Connection, the DataSource, the PreparedStatement and the ResultSet in a state object called JdbcQueryState:

Loading code....

Now, when a new item is requested, we get the next row from the ResultSet and pass it to the observer or signal the observer that it's complete when there are no rows left:

Loading code....

Finally we need to cleanup all state when it's done:

Loading code....

All this is called from methods that take the sql and return a backpressure aware Observable:
Loading code....

To hook into Spring's statement handling I needed to extends from NamedParameterJdbcTemplate. You can see the full code here

Greetings
Jan