Sunday, July 27, 2014

Tomcat, Atmosphere and Spring Security

Here I'd like to describe another interesting case I've been struggling with for recent few days. This involves the following use case: enable asynchronous events support for Tomcat/Spring multi-tenancy SaaS application, that can be pushed to listening client groups. To be specific, the event should be channeled to following groups: to specific user, to all users of specific tenant and to all users.

Atmosphere + Tomcat

The fancy new technology for async processing in Java world is Atmosphere, and I use it in this example. Unfortunately I started with horribly preconfigured Atmosphere which apparently locked Tomcat after making some number requests, and it wasn't able to serve more requests. It turned out that the working Tomcat + Atmosphere config is something not so obvious, so let's quickly describe all these problems to move on.

I started with the following maven dependency:

<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
<version>2.1.7</version>
</dependency>

After a lot of struggling I came to conclusion that there's no way to properly run Tomcat with Atmosphere using this library (at least in 2.1.7 version). I started with standard Atmosphere configuration, which uses native Tomcat async implementation (Comet support). In this scenario there's a bug in Atmosphere which results in using Tomcat BIO support (blocking IO) instead of NIO (non-blocking IO). Finally, you have a thread created for each async request, which is then suspended and moved to waiting pool. When you reach the tomcat thread pool capacity (default is 200) you end up with completely frozen application.

Afterward I changed the implementation from native Tomcat async support to Servlet 3 specification, using following flags:

<init-param>
    <param-name>org.atmosphere.useNative</param-name>
    <param-value>false</param-value>
</init-param>
<init-param>
    <param-name>org.atmosphere.useWebSocketAndServlet3</param-name>
    <param-value>true</param-value>
</init-param>

Using this config, it started to work through Tomcat NIO, but the odd things started to happen as well. For example random freezes on standard request processing, and a lot of java.lang.IllegalStateException: Cannot forward after response has been committed exceptions. Something similar to this guy situation.

After a lot of debugging what is really happening in the Atmosphere and Tomcat threads I gave up and I found the solution with so called "native" atmosphere implementation, what apparently is the same lib with only one class changed with fixed native Tomcat support for Atmosphere (scenario 1), which is called:

<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime-native</artifactId>
<version>2.1.7</version>
</dependency>

And is describe here. It finally works well using Tomcat Comet support and/or native Tomcat websockets support. Additionally it requires /META-INF/context.xml with following content:

<Context>
<Loader delegate="true"/>
</Context>

Atmosphere + Spring

Now something which is simple and can be found in many examples on the net. How to configure Atmosphere so that it can route requests to Spring DispatcherServlet.  To skip unnecessary words, I'll make it quick:


Things that might be explained a little more are following:
  1. org.atmosphere.useNativeorg.atmosphere.useWebSocketAndServlet3 make it finally clear that we want to go using Tomcat native async support.
  2. org.atmosphere.cpr.broadcaster.maxProcessingThreads - this is the limitation to 10 for Atmosphere threads. Atmosphere spawns some threads sweeping suspended requests (eg. by flushing their response buffers).
  3. org.atmosphere.cpr.broadcasterLifeCyclePolicy=EMPTY_DESTROY is the lifecycle policy for Atmosphere Broadcaster objects. Usually Broadcaster has assigned some AtmosphereResource-s, representing opened async connections. When all connections for particular Broadcaster are closed, the Broadcaster object may still be held in memory and reused. For SaaS application, that may handle hundreds of tenants and thousands of users concurrently I consider it a bad pattern. EMPTY_DESTROY tells Atmosphere to relase all Broadcaster objects if they don't have assigned any resources, and remove them from memory.
  4. org.atmosphere.cpr.AtmosphereInterceptor is the important one here, because after Atmosphere invokes broadcasting operation, the response buffers are flushed periodically with all data written, so they could contain more than a single message at one flush operation. In such instance your client would receive two or more messages in one event listener notification, what is usually unwanted. This can be overcome by using TrackMessageSizeInterceptor on the server side, and trackMessageLength parameter in Atmosphere client.
  5. AtmosphereSpringControllerResolver enables direct AtmosphereResource injection to Spring controller.

Atmosphere + Spring Security

Now what we'd like to have is the Spring Security context injected to Atmosphere requests, in order to extract user from the SecurityContextHolder and to apply broadcasting operations on suspended requests. The answer on the question how to do it is simple: you can't.

There are two problems I came across with this subject. First the Spring Security filters aren't applied to MeteorServlet, because it's not a reguler servlet, but CometProcessor, supporting async requests. For such type of servlets only CometFilter can be applied, not a reguler Filter, which is implemented by Spring Security DelegatingFilterProxy. You can overcome this problem, though, by either wrapping the Spring Security filters with your own CometFilter-s, or by overriding the default FilterChain by your own implementation. Anyway, it doesn't work as well.

This is because the SecurityContextHolder default storage strategy is ThreadLocalSecurityContextHolderStrategy, which holds the SecurityContext in ThreadLocal (this is the only production implementation and one cannot imagine different working strategy for this problem). It works well for standard requests, processed in separate threads, but for suspended Atmosphere requests there's a problem. When the resources are swept and buffers are flushed, all this process happens in internal Atmosphere thread pool, and one thread supports many AtmosphereResource-s in single execution, so the SecurityContext can't be bound to the thread, because you end up with an exception, or much worse, with different user authorized than it should be.

So what I do, and I'll show in the further example, is how to extract user directly for HTTP session to be used with AtmosphereResource to create appropriate broadcasters.

There's another remark about this overall architecture. If you can run DispatcherServlet through Atmosphere, you might tend to run your whole application through MeteorServlet wrapper. But, when you consider above facts, that you can't apply normal filters to this servlet, and moreover you can't apply security filters on it, the conclusion is simple: just don't do it. Define your "async" Atmosphere servlet context separately from another regular "sync" DispatcherServlet, and everything will be fine.

Broadcast events to user groups

Before final implementation, we need to understand how Atmosphere and Atmosphere Broadcaster-s work internally. 

In regular request processing, when the request comes, Tomcat takes the free thread from the thread pool (or queues the job in the thread pool queue, if all threads from pool are busy), and delegates the request processing to this thread. The Spring Security filters extract the user from HTTP session and put him to the SecurityContext held in current request thread ThreadLocal. Then the work is delegated to your servlet, response buffers are filled, everything is cleaned out and thread is released back to the pool. The response buffer is then written to the client.

In async request processing Atmosphere waits for incoming requests with its own thread pool. When the request comes, one of these threads receives it (or, like in above situation, the job is queued waiting to release at least one thread from the pool), suspends it, and returns thread to the pool. The suspension figures on releasing the processing thread, while the TCP connection is still opened. All these suspended requests are stored in an internal storage, and can be accessed in any moment in application. For all of them the TCP connections are opened, and one can write to the opened Response object to send async events to the client.

But, how to tell apart one suspended request from another? For our example - how to find all requests sent from all logged users of specific tenant? For such use cases Atmosphere introduces the Broadcaster concept. With a single suspended request (AtmosphereResource) you can associate one or more broadcasters, and use these broadcasters to send events to choosen clients. With each AtmosphereResource there's a single Broadcaster created with random UUID. 

Using this idea and knowing this UUID you may send the async event to each suspended request separately, by choosing appropriate broadcaster. Another Atmosphere concept is MetaBroadcaster. It can be used to send event using all broadcasters fitting to the expression. For example:
  1. User A connects to async service, the broadcaster with ID="/UUID-1" is created.
  2. User B connects to async service, the broadcaster with ID="/UUID-2" is created.
  3. Using MetaBroadcaster you may send data to either first or second user by broadcastTo("/UUID-1", event) or broadcastTo("/UUID-2", event).
  4. Or you can send event to all users by broadcastTo("/*", event).
This well concept can be adapted to our use case. Let's assume we have a TENANT_ID and USER_ID, defining our tenant and its user. We need to assign only one broadcaster to each async request to achieve our goals:
  1. User connects to async service, the broadcaster with ID="/TENANT_ID/USER_ID" is created.
  2. To send event to this particular user, use broadcastTo("/TENANT_ID/USER_ID", event).
  3. To send event to all logged users of specific tenant, use broadcastTo("/TENANT_ID/*", event).
  4. To send event to all logged users, use broadcastTo("/*", event).
And here comes the implementation with all described above:


Finally, in AsyncDispatcher controller we just need to suspend request using AsyncService.suspend() method, to make it all working together.