Monday, December 1, 2008

Continuations in CXF

It's a classic problem really, how a service can help a client to submit a request and obtain a response asynchronously. A number of approaches exist :

* Client does an invocation on a web service and provides an address to reply to - this requires a client to have a web service of its own for the callback to eventually arrive while on the server side, for the (HTTP) transport thread be released, a typical HTTP 201 may be sent back as an initial acknowledgement, depending on if it's a typical two-way or so-called one way operation.

* Service responds with a resource address a client needs to poll for the results be eventually obtained which requires a client to structure the code accordingly.

There's another approach emerging - using suspended invocations or continuations. Actually, it's not a new approach but with Jetty supporting and Servlet 3 embracing them the suspended invocations are bound to enter the mainstream.

Please read a Jetty Continuations page on when using continuations can make sense.

Without further ado, here's a sample code fragment showing how one can do continuations in CXF in a transport neutral way. CXF Continuations API is currently supported for SOAP-HTTP services based on Jetty 6 and SOAP-JMS services. CXF JAXRS runtime supports them too :



import org.apache.cxf.jaxrs.ext.MessageContext;

@Path("/")
public class WebResource {

private @Context MessageContext context;
private Executor executor = ...;

@GET @PATH("/quote/{id}")
public Quote getQuote(@PathParam("id") String policyId) {

String key =
"org.apache.cxf.continuations.ContinuationProvider";
ContinuationProvider provider = context.get(key);

Continuation c = provider.getContinuation();
synchronized (c) {
if (c.isNew()) {
FutureTask f = new FutureTask(
new CallablePolicyHandler(policyId, c));
c.setUserObject(f);
executor.execute(f);
c.suspend(timeout);
} else {
FutureTask f = (FutureTask)c.getUserObject();
if (f.done()) {
return f.get();
}
c.suspend(decreaseTimeout());
}
}
}
}


What happens is that when continuation.suspend(timeout) is called, the current transport thread gets immediately released and a pending request is put back in the requests queue. Once an asynchronous activity gets completed, it will call continuation.resume() - in this example it's done somewhere inside a CallablePolicyHandler, which results in a suspended request be returned to this method. The code now checks if the FutureTask is done and if not then it means an initial timeout was not enough for the asynchronous activity to complete - in this specific case we decide to suspend a request yet again but with a smaller timeout.

Note that in case of JAXWS the code will be absolutely identical for this specific sample, except that a ContinuationProvider instance will have to be obtained from a JAXWS WebServiceContext.getMessageContext().

Both CXF JAXRS MessageContext and JAXWS MessageContext will have to be used when combining JAXWS and JAXRS. I think we may need to come up with a common MessageContext interface for such cases.

Finally, this CXF Continuations API won't change when Jetty 7 (and indeed Tomcat) implementing Servlet 3 specification will ship.

For another example see this CXF-based demo in a Jetty 7 trunk. It shows the use of JAXWS asynchronous handlers among other things. Note in this demo a new ServletRequest.suspend() call is used as opposed to ContinuationsSupport.getContinuation() and Continuation.suspend() pair of calls in Jetty 6. Nothing to worry about though if you're a CXF Continuations API user - it all will be handled internally without you noticing it.

6 comments:

raj said...

Hi

I am trying to use continuations for a SOAP service in a jetty server 7.x using cxf 2.3.1. But I can't get the continuation provider.

String key = "org.apache.cxf.continuations.ContinuationProvider";
String key = "org.apache.cxf.transport.http_jetty.continuations.JettyContinuationProvider";

JettyContinuationProvider contProvider = (JettyContinuationProvider) context.getMessageContext().get(key);

contProfivider is always null.

Thanks for you feeeback.

Unknown said...

I'm also getting null for continuationProvider.
Somebody has solved the issue?

Sergey, could you please explain what is wrong? Why do we get a continuationProvider == null

private @Resource
MessageContext context;
...
final ContinuationProvider continuationProvider = (ContinuationProvider)context.get(ContinuationProvider.class.getName());

Usman Ismail said...

In jetty you could use
Continuation continuation = ContinuationSupport.getContinuation(req);

where req is the HttpServletRequest object

Sergey Beryozkin said...

CXF Continuations API is meant to be servlet container and even transport agnostic, it should with legacy Jetty, servlet 3; and with JMS

Jan Bartel said...

With the Servlet 3.0 spec for asynchronous processing, it is required that the servlet is either annotated with asyncSupported, or declared with true in web.xml, or dynamically registered with setAyncSupported(true) before async operations are supported (ie Request.isAsyncSupported() must return false unless one of the above is true).

I think the documentation of the setup for CXFServlet needs to be updated to reflect that. Certainly jetty-9 changed from allowing any servlet or filter to do async operations to be spec compliant and thus requiring one of the options above to be true.

regards
Jan

Sergey Beryozkin said...

See the updated page:

https://cwiki.apache.org/confluence/display/CXF20DOC/Servlet+Transport

thanks