Thursday, February 7, 2008

OpenSpaces SVF - Remoting on Steroids

When OpenSpaces was first released, one of its core features was Space Based Remoting.
Based on the Space as a discovery, transport, load balancing and failover capabilties, this remoting mechanism provided a a drop in replacement for other remoting implementations, allowing for exposed services to be highly available and redundant and for remote client to get fault tolerance and load balancing out of the box without changing a single line of code.
With 6.5, we have decided to change the name of this feature to the Service Virtualization Framework (SVF) as we feel it has become much more than just a remoting implementation, and better describes the overall value it can bring to applications, by virtualizing any service object across the GigaSpaces grid.
In the next few paragraphs I'll try to explain how this framework works and what are the new improvements we have added to it in GigaSpaces 6.5.

So how in fact does it work?
Underneath, the remoting mechanism relies on the space to get all the above.
What happens is that once the client makes a remote call, the local client side proxy (created dynamically at application startup) packages the invocation into an invocation object, and writes it to the space.
At the space side, you can choose whether you want to handle the remote call synchronously or asynchronously:
  • Handling the call synchronously means that the space will use the inbound communication thread (which was used to receive the request from the network) for the processing of the request. This is similar to most other remoting implementations. This mechanism is built on top of space filters, such that a dedicated filter delegates the invocation to the service object and returns the result back to the client. The benefit of it is that it's usually faster than using a separate thread for processing the request since there are less steps involved.
  • Handling the call asynchronously means that you have a separate thread pool (in the form of an OpenSpaces event container) that consumes the invocation object out of the space, calls the service and then return the result back to space. The client proxy then picks the result from the space using a take operation and returns it back to the caller.
The benefit of that is that the model is more scalable and safer than the synchronous one, as the load at any moment depends mostly on the number of processing threads rather than the actual number of requesting clients. It also allows for true asynchronous operations by using JDK Futures and retrieving the result of the invocation asynchronously.

Why use the SVF instead of ordinary remoting mechanisms?
For both implementations (sync and async), the fact that everything is done through the space means that the framework provides the following out of the box:
  1. Automatic and transparent failover: using the space's failover capabilities, a remote call (i.e. the invocation object) is transparently routed to another node when the default node for the invocation becomes unavailable.
  2. Load balancing: using the space's load balancing capability, the remote call (i.e. the invocation object) can be routed to any one of the cluster members or even to all of them. Again, this is done in complete transparency to the calling code.
  3. Non intrusiveness: As with any other good remoting implementations, the client code is completely isolated from the underlying remoting mechanism. This is actually a very powerful yet non intrusive manner of implementing SBA. Both the client and service code can be completely independent of any GigaSpaces interface, making them truly portable across any runtime platform.
Code samples
The primary way to enable SVF in your application in your Spring beans file.
Let's take the example of the following business interface (assume a Data and DataResult classes):

public interface IDataProcessor {

* Process a given data object and return a DataResult.
DataResult processData(String title, Data data);
It takes an instance of type Data as input and returns an instance of type DataResult.
For simplicity Lets also assume the following implementation for the service:
public class DataProcessor implements IDataProcessor {
public DataResult processData(String title, Data data) {
System.out.println("Processed: " + data);
return new DataResult("Done processing: " + title);
In order to configure this on the space side we need to determine whether we want the service to be exposed as a synchronous service, asynchronous service or both.
The configuration is part of a processing unit deployed to the GigaSpaces grid.
Here's the Spring configuration inside the processing unit's pu.xml:
<os-remoting:service-exporter id="remotingServiceExporter">
<os-remoting:service ref="dataProcessor"/>
The above snippet exports the service such that it could be used as a remoting endpoint.
Now we need to configure the actual remoting mechanism.
Here's an example for a sync remoting configuration. Note that we simply take to the exported object and use it as space filter:
<os-core:space id="space" url="/./space">
<os-core:filter-provider ref="remotingServiceExporter" />
<os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>
If we want to use async remoting we need to use either a polling or a notify container. Here's an example for a polling container configuration which uses two threads to process invocation requests:
<os-events:polling-container id="remotingContainer" giga-space="gigaSpace" concurrent-consumers="2">
<os-events:listener ref="remotingServiceExporter" />
On the client side, we should configure a proxy to be used by the client code.
In 6.0, the way to this was to configure everything in the Spring beans xml file, as follows.
For async proxy:
<os-remoting:async-proxy id="dataProcessor" giga-space="gigaSpace"
For sync proxy:
<os-remoting:sync-proxy id="dataProcessor" giga-space="gigaSpace"
Now all we need to do is wire the proxy with the actual client code, as follows:
<bean id="myRemotingClient" class="org.openspaces.example.MyRemotingClient">
<property name="dataProcessor" ref="dataProcessor"/>
The client code would simply invoke the method on the interface:
//injected via Spring 
private IDataProcessor dataProcessor; ...
public void doSomethingWithRemoteProxy() {
String title = ...
Data data = ...
DataResult result = dataProcessor.processData("
title", data);
The call is completely unaware of how the space is deployed, how many instanced it has or what is the actual clustering topology.
Behind the scenes, the space proxy will route the invocation object that the proxy generates with every request to the relevant node. For example, if you're dealing with a partitioned space, the request will by default be randomly routed to one of the partitions (this is based on the hashCode of the invocation object and can be overridden - see below). With a replicated topology, the request will go to the node to which the client is currently connected (which is determined by the load balancing policy for the cluster and can be round robin, weighted round robin, etc.).
For both topologies, in case of failure of the node to which the request was sent, the request will be sent automatically to the backup node if such exists.
In 6.5, the client side configuration will even be simpler - you can simply annotate a field of the remote interface type with a @SyncProxy or @AsyncProxy annotation and OpenSpaces will create the proxy for you and inject in to your client object. Here's an example:
@AsyncProxy(gigaSpace="gigaSpace", timeout = 15000)
private IDataProcessor dataProcessor;
In the Spring beans file, the following line has to be included to make OpenSpaces infrastructure process all beans with this annotation:
<os-remoting:annotation-support />
In the above example, OpenSpaces infrastructure will inject the client code with an Async remoting proxy, which uses a GigaSpace instance by the name of "gigaSpace" (as defined in the Spring beans configuration file) and use a call timeout of 15 seconds.

Advanced Features
Routing the call in a partitioned space topology
Many of GigaSpaces users use the partitioned topology, which is required in case you have more data on the grid than any single machine can contain. It is also very useful for cases where you want to distribute the processing load between a number of machines, each handling a different subset of the data.
When an object is written into a partitioned space, the partition is determined by the hash code of the routing field, which is designated by the user. With remoting however, a method can have more than one parameter, or no parameters at all.
By default, the routing is determined by the hash code of the entire remote invocation object.
This behavior can be overridden using an implementation of the RemoteRoutingHandler interface:
package org.openspaces.remoting;

public interface RemoteRoutingHandler {
T computeRouting(SpaceRemotingInvocation remotingEntry);
As you can see, this interface contains one method, computeRouting, which is given the remote invocation entry and returns a value based on which the routing value will be computed (the space proxy will call its hashCode() method for that). Here a sample implementation:
public class DataRemoteRoutingHandler impplements RemoteRoutingHandler {

public Long computeRouting(SpaceRemotingInvocation remotingEntry) {
if(remotingEntry.getMethodName().equals("processData")) {
Data data = (Data) remotingEntry.getArguments()[1];
return data.getType();
return null;

In the pu.xml file, we need the proxy to reference this object:
<os-remoting:async-proxy id="dataProcessor" giga-space="gigaSpace"
<bean class=""/>

With 6.5 you can do it in much simpler fashion: you can simply add the @Routing annotation to the signature of the service interface, and the routing will be based on it!

public interface IDataProcessor {
DataResult processData(@Routing String title, Data data);
Sending a request to more than one node - ala Map/Reduce
In many cases, you would want to use all the nodes in the network to do some processing, and then aggregate the result on the client side, ala Map/Reduce.
In that case there are two things you should do: Define the remote proxy to broadcast the call to all partitions, and define a result aggregation policy to be executed on the client side once results from all nodes have returned.
The broadcast is supported for sync proxies, and is enabled in the following way (using annotations based configuration):
@SyncProxy(broadcast = true, remoteResultReducerType = MyResultReducer.class)
private IDataProcessor dataProcessor;
The above annotation configuration references the MyResultReducer class.
This is an implementation of the RemoteResultReducer interface, which is responsible to get the results from all the nodes and aggregate them into one object which will be returned to the calling code. Here's this interface's definition:
package org.openspaces.remoting;

public interface RemoteResultReducer {
T reduce(SpaceRemotingResult[] results, SpaceRemotingInvocation remotingInvocation) throws Exception;
It's getting an array of SpaceRemotingResult instances which contains information on the invocation of a single node, such as the invocation result, whether or not an exception occurred, and where the invocation took place. It returns the final aggregated result, which is passed on to the calling code.
This enables you to grid enable your service without affecting the calling code.

Using futures and one one way calls
Sometimes you don't want the calling code to block and wait for the invocation to take place. This can be true if you know the calculation will take a lot of time, or if the calling thread does not require the invocation result to continue. When using async proxies, you have two options depending on the signature of the invoked method:
  • If it doesn't have a return value, you can simply declare it as "one way", which means that the client side proxy is not going to wait for it's completion:
    @AsyncProxy(voidOneWay = true)
    private IDataProcessor dataProcessor;
    In the above snippet, the proxy will not wait for any method that has no return value.
  • If it does have a return value, you can use a JDK Future to get the result at later time or using another thread. To do that, you should declare an interface that returns a Future object. OpenSpaces infrastructure will detect that automatically and not block the proxy on the call. So our previous IDataProcessor interface will now look like this:
    public interface IDataProcessor {
    Future processData(String title, Data data);
    Note that you can still deploy the previous interface (without the Future) on the space side, as this is purely a client side related issue. So you can have client waiting synchronously to get the invocation result, and clients using a Future and retrieving the result asynchronously.
Remoting Aspects and MetaData
New to 6.5 is the ability to apply cross cutting concerns, similar to the AOP aspects or servlet filters. You can apply your own logic on the client (bofore the call is made) and on the server (after the call has been intercepted and before it's delegated to the service).
This is useful to apply system wide functionality such as logging and performance measurements. Another new feature in 6.5 is the ability to piggyback the remoting invocation and send custom metadata along with it. This could be very useful when implementing security for example.
In fact, combined with the remoting aspects, it's very simple to implement a custom, non-intrusive security mechanism that would be completely transparent to the calling code.
You can read about it more here.

In this post I showed the benefits and rich set of features that are part of the OpenSpaces Service Virtualization Framework. These are all documented in full in the GigaSpaces wiki.
I encourage you to try out our new 6.5 early access version and test drive the new SVF features.
You can download the EAP version here. An initial version of the documentation for OpenSpaces SVF can be found here.


kov said...

Hi this is very interesting stuff and thanks for devoting so much space to it. I have to ask though, if our data-processor's profile is a method with a return value, I don't see where the benefit is in wrapping that up in your async-proxy. It appears the result is a client that makes a call and blocks for a result, and a service which has a function called and returning in the same way (synchronously).

In this example, where are you actually getting asynchrony? Or is there a better example?

Uri Cohen said...

Hi Ken

Sorry for the late reply, for some reason I didn't get an email notification about your comment :(

relating to your question:
You are right that in that case, from the client's perspective it looks like any other synchronous call. So if you do have a client that expects to get a result synchronously, sync remoting is probably a better option (although async remoting will work as well).
Nevertheless, the way things are executed by the space side is different in in a few ways:
1. Threading: With sync remoting, the inbound communication thread is the one that is used to delegate the call to the service instance. With async remoting, the call is just written into the space, and the event container is the one that will handle it, in a separate thread pool. So it's a bit safer if you have a lot of concurrent clients (since it's not thread-per-client approach), but is a bit slower in terms of latency for the exact same reason.
2. Transactional behavior: with async remoting you have two transactions, one writing the call to the space, and the other removing it and writing the result back to it). Then you have the client side proxy reading the result using a take operation.
In sync remoting everything is part of the same operation (similar to vanilla RMI). This is another reason that sync remoting is faster than async remoting.

The following post by Nati, our CTO, provides a good theoretical background on this.

Hope this clarifies things.