Thursday, May 10, 2012

XAP 9.0 – Geared for Real-Time BigData Stream Processing


XAP 9.0 is out the doorway, and I thought it would be a good opportunity to share some of the things we’ve been up to lately. 
Traditionally, one of XAP’s primary use cases was large scale event processing, more recently referred to as big data stream processing or real time big data analytics. Some of our users are reliably and transactionally processing up to 200K events per second, in clusters as large as a few hundreds of nodes.
In a sense it’s taking Map/Reduce concepts and applying them to an online stream of events, analyzing events as they arrive rather than waiting for all the data to be available offline and only then triggering the Map/Reduce jobs.
There are many use cases that are applicable here, such as web analytics, financial trading, online index calculation, fraud detection, homeland security, guidance systems and essentially any use case that requires immediate feedback on a massive stream of events, typically tens or even hundreds of thousands of events per second.
In the last few months we’ve heard of numerous frameworks that claim to be a real time analytics silver bullet (some rightfully so, some less...), so I wanted to recap what we’ve learned here at GigaSpaces in the past few years from dealing with large scale event processing systems, and what we’ve done in XAP 9.0 to support these scenarios even better.

What It Takes to Implement Massive Scale Event Processing

There are a number of key attributes that are important for big data event processing systems to support, all supported by XAP and are at the core of it:
  • Partitioning and distribution: Perhaps the single most important trait when it comes scaling your system. If you can efficiently and evenly distribute the load of events across many event processing nodes (and even increase the number of nodes as your system grows), you will ensure consistent response times and avoid accumulating a backlog of unprocessed events.
    XAP allows for seamless content based routing, based on properties of your events. It also supports adding more nodes on the fly if needed.
  • In memory data: in many cases, you need to access and even update some reference data to process incoming events. For example, if you’re calculating a financial index you may need to know if a certain event represents a change in price of a security which is part of the index, and if so you may want to update some index related information. Hitting the network and the disk for every such event is not practical at the scale we’re talking about, so storing everything in memory (the events themselves, the reference data and even calculation results) makes much more sense. An in memory data grid allows you to achieve that by implementing memory based indexing and querying. It helps you to seamlessly distribute data across any number of nodes, and takes care of high availability for you. The more powerful your in-memory indexing and querying capabilities are, the faster you can perform sophisticated calculations on the fly, without ever hitting a backend store or accessing the network. XAP’s data grid provides high availability, sophisticated indexing and querying, and a multitude of APIs and data models to choose from.
  • Data and processing co-location: Once you’ve stored data in memory across multiple nodes, it’s important to achieve locality of data and processing. If you process an incoming event on one node, and need to read and update data on other nodes, your processing latency and scalability will be very limited. Achieving locality requires a solid routing mechanism, that will allow you to send your events to the node most relevant to them, i.e. the one that contains the data that is needed for their processing. With XAP’s event containers, you can easily deploy and trigger event processors that are collocated with the data and make sure that you never have to cross the boundaries of a single partition when processing events.
  • Fault tolerance: Event processing is in many cases a multi-step process. For example, even the simplest use case of counting words on twitter entails at least 3 steps (tokenizing, filtering and aggregating). When one of your nodes fail, and it will at some point, you want your application to continue from the point in which it stopped, and not go through all the processing flow for each “in-flight” event (or even worse, completely lose track of some events). Replaying the entire event flow again can cause numerous problems: If your event processing code is not idempotent your processors will fail. And under high loads this can create a backlog of events to process, which will make your system less real time and less resilient to throughput spikes. XAP’s data grid in general, and event containers in particular, are fully transactional (you can even use Spring’s declarative transaction API for that if you’d like). Each partition is deployed with one synchronous backup by default, and transactions are only reported committed once the all updates reach the backup. When a primary fails, the backup takes over in a matter of seconds and continues the processing from the last committed point. In addition, XAP has a "self healing" mechanism, which can automatically redeploy a failed instance on existing or even new servers.
  • Integration with batch oriented big data backends: Real time event processing is just part of the picture.  There are some insights and patterns you can only discover thorough intensive batch processing and long running Map/Reduce jobs. It’s important to be able to easily push data to a big data backend such as Hadoop HDFS or a NoSql database, which unlike relational databases, can deal massive amounts of write operations. It’s also important to be able to extract data from it when needed. For example, in an ATM fraud detection system you want to push all transaction records to the backend, but also extract calculated “patterns” for each user, so you can compare his or her transactions to the generated pattern and detect frauds in real time. You can use numerous adapters to save data from XAP to NOSQL data stores. XAP’s open persistency interface allows for easy integration with most of these systems.
  • Manageability: & Cloud Readiness: Big data apps can become quite complex. You have the real time tear, the Map/Reduce / NoSql tear, a web based front end and maybe other components as well. Managing this consistently, and more so on the cloud which makes for a great foundation for big data apps, can easily become a nightmare. You need a way to manage all those nodes, scale when needed and recover from failure when they happen. Starting from XAP 9.0, XAP users can leverage all the benefits of Cloudify, to deploy and manage their big data apps in their own data center or on the cloud, with benefits like automatic machine provisioning for any cloud, consistent cluster and application-aware monitoring and automatic scaling for the entire application stack, and not just the XAP components.

XAP 9.0 for Big Data Event Processing

Now that I covered what XAP already had to offer for big data analytics, I’d like to delve a bit into the new capabilities in XAP 9.0, our newest release, which complement nicely the already existing ones as far as big data stream processing is concerned:
  • FIFO groups (AKA Virtual Queues): this feature is quite unique to XAP. It allows you to group your events based on the value of a certain property, and while across groups you can process any number of events in parallel, within the same group you can only access one event at a time, ordered by time of arrival. Think of a homeland security system with multiple sensors – in many cases you want to process readings from the same sensor in order, so you can tell for example if a certain suspicious car is moving from one place to another, and not vice versa, but across sensors you want to process as many events as possible.
  • Storage types: Most real time analytics systems rely heavily on CPU and memory. So using them efficiently is always important. With XAP 9.0 we’ve introduced a new mechanism that allows users to annotate object properties (object can represent both events and data) with one of 3 modes – native (meaning the property is saved on the heap as a native object), binary (the property is serialized and is only deserialized when an actual client reads it) and compressed (same as binary, with gzip compression).  This allows for fine-grained control over how the memory is utilized and save your application from doing unnecessary serialization and deserialization when accessing the data grid.
  • Transaction-aware durable notifications: Pub/Sub notification are important in scenario where you want a certain event to trigger multiple flows, or be processed in parallel on multiple servers. It is also useful when propagating processing results downstream to other applications. With XAP 9.0 we’ve enhanced our pub sub capabilities to be durable (i.e. even if a client disconnects and reconnects it will not miss an event) and provide once and only once semantics. In addition, Notifications for data grid updates (e.g. event objects written or removed, other data updated) maintain transaction boundaries. That means that if multiple events were written, or multiple pieces of data updated, the subscribed clients will be notified on all of them or none at all.
  • Reliable, transaction-aware local view: another interesting use case when it comes to event processing is when you want you event processor to be located outside of the data grid. This gives you the benefit of scaling your event processors separately from the data grid, at the expense of accessing the data over the network. However using the local view feature allows you to locally cache within the processor’s process, a predefined subset of the data that you to be relevant to your processing logic. The local view mechanism will make sure it stays consistent, up to date and that you never miss a data update even after disconnecting and reconnecting.
  • Web Based Data Grid Console: Understanding what’s going on with your events, what types of events are queued, and what data resides in the data grid is essential to the operation of every event processing system. XAP’s new data grid console allow you to monitor everything within the data grid from your browser using intuitive HTML5 interface. You can view event and data counters, submit SQL queries to the data grid, and do a lot more.
  • Cloud Enablement: XAP 9.0 comes with Cloudify, our new open source PaaS stack built in, which allows you to manage all of the components of your big app, including the backend Hadoop or NoSql database and the web front end.

See It in Action!

You can see all of that in action with our new twitter word count example, whose code is available on github.

What’s Next?

There are a few other cool features in XAP 9.0 that you can learn about here.
We’re planning a lot more interesting features around big data analytics, so keep your ears open  :) 

References


Thursday, April 7, 2011

XAP 8.0.1 is Out!

We’ve just released XAP 8.0.1, with a lot of goodies included. 8.0.1 is the first feature and service pack on top of XAP 8.0.0. It includes many enhancements and a few exciting new features. Here’s a short recap:

  • Improved Web UI Dashboard with Alerts View: The dashboard view now gives you a single click view of the entire cluster, including alerts on various problematic conditions. The previous view is now available under the topology tab. This is the first stage in the new Web based UI planned for XAP. You can find more details about it here.

dashboard

  • Elastic Deployment for Stateless and Web Processing Units: The elastic deployment model introduced in 8.0 for stateful and data grid only processing units has now been extended to support stateless and web processing units. You can scale web applications and stateless processing units up and down based on CPU, memory or available resources.
  • Document (Schema-Free) API support for .Net: The .Net edition now includes the all new document API which was introduced in the 8.0.0 in the Java version. It enables you to maintain a completely flexible domain model without any restrictions on the entry's schema, and add/remove properties as your application evolves. It also simplifies interoperability with Java since when using the Document API it's no longer tied to a concrete .Net and Java classes.
  • Improved complex object querying and indexing for .Net: The .Net edition now enables you to query and index complex object structures, including nested collections and arrays.
  • Deep POJO/PONO - Document Interoperability: Documents and POJOs can now be mixed interchangeably across all nesting level. You can read a document as a POJO/PONO (assuming its type name corresponds to the POJO/PONO class name) and vice versa. The space will convert between the formats across all nesting levels, so if you have a complex Java object for example which contains a reference to a nested Java Object or a collection of nested objects, the space will convert the entire object graph to documents and sub documents. In addition, you can also define a "bag" of dynamic properties for a certain POJO/PONO so that new properties that are added via the document API to the entry are exposed in the POJO/PONO instance via this bag.
  • Map/Reduce and Native Query Support for JPA: The XAP JPA Implementation now supports the JPA NativeQuery facility. On top of running queries in the Space's native syntax, it also enables you to actually execute Space tasks across one or all cluster nodes and bring the power of the grid to the JPA API. Tasks can be defined using the GigaSpaces task execution interfaces or even as a dynamic language script for scripting languages that are supported as part of the JVM.
  • Method Level routing and result reducers for Space Based Remoting: Space Based Remoting has traditionally been a very popular facility to reliably expose scalable business services to your application clients. In 8.0.1, you can specify method level behaviors for the foundational remoting constructs such as RemoteRoutingHandlers and RemoteResultReducers via the dedicated @ExecutorRemotingMethod and @EventDrivenRemotingMethod annotations.
  • WAN Replication Improvements: 8.0.1 adds a number of important improvements and bug fixes to the replication over WAN module, such as better peer classloading behavior (when the classes written to the space are not part of the space's classpath), better cleanup of replicated entries, and support for replication of .Net entries.
  • Improved Performance of .Net Executor API: The .Net task execution API has gone some optimization in the way that tasks are passed to the space and executed in it, which resulted in performance boosts of up to 250%.
  • More JPA goodies: In addition to NativeQuery support, we have also implemented a number of other changes, including better JPQL syntax support (LIKE, IS NULL), optimistic locking support and improved relationship handling.
  • Improved XA Transaction Support: XA transactions can now work against a partitioned space cluster as a single XA resource (via the distributed Jini transaction manager) rather than working with each partition separately.
  • Mule 3.1 Support: The build in Mule ESB support has been ungraded to support Mule version 3.1.

The full list of changes, improvements and bug fixes can be found in our release notes section.

You welcome to give it a go and let us know what you think.

Tuesday, March 8, 2011

Data Grid Querying, Revisited

There has been a great deal of talk lately about the new EHCache cache querying capabilities and the advantages of real-time analytics through in-memory cache querying. I find that rather odd since extensive querying and processing capabilities have been around for years with in memory data grids like GigaSpaces XAP, Oracle Coherence, Gemstone GemFire and more recently Hazelcast and GridGain. So I don’t really understand the big fuss around EHCache finally supporting it….

But that’s actually a great opportunity to revisit some of the work we’ve done in our recent 8.0 release in the context of querying. There are two main features we’ve introduced in 8.0 that take data grid querying to the next level.

Complex Object Querying

The first one is complex object graph querying. Simply put, you can now index and any type of property, at any nesting level, of your data grid objects: primitive types, user defined types, or collections/arrays of either.
Think of an Author object, which has a collection of books, each of them containing a collection of user reviews.
Here are just two examples of what you can achieve with XAP 8.0:
• List all the Authors that wrote at list one sci-fi book:




Author[] authors = gigaSpace.readMultiple(new SQLQuery<author>(Author.class,
"books[*].genre = ‘Sci-Fi’ ORDER BY name",Integer.MAX_VALUE);


• List all the Authors who have at least one book on which your friend on Facebook commented:




Author[] authors = gigaSpace.readMultiple(new SQLQuery<author>(Author.class,
"books[*].comments[*].username= ?", “myFacebookFriend”, Integer.MAX_VALUE);


This goes hand in hand with our new Document support, so you can apply the above to schema free documents as well as just plain Java or .Net objects.


GigaSpaces JPA


The second feature is our all new JPA support. We believe this to be a major step towards ease of use and standardization of distributed data store access.


There have been a lot of discussions in the developer community about how the “classic” querying and interaction models (such as SQL/JDBC and JPA) can be mapped to the world of distributed data stores (which in memory data grids are a part of).


When it comes down to querying, each distributed data store defines its own querying language and semantics. Some are completely proprietary (e.g. EHCache’s new query DSL, or MongoDB’s query syntax), some are modeled after standard SQL to some extent or the other (e.g. GigaSpaces’ native querying support, Coherence’s recent QL, or GridGain’s query language).
All of these seem be solid query implementations (naturally you can guess which one I like best :) ).


But when developing to each of those products, you need to learn their interaction model, data access semantics (such as transaction management) and of course querying language.


The first take at bridging the two worlds, and IMO pretty successful one considering the inherent difficulties in bridging the two models (see this presentation I’m giving at QCon London in two days), was actually done by Google almighty with the JPA support in AppEngine.
Although not 100% standard (actually not even close), it gives you as a developer a very easy and familiar way to implement scalable data access on top of their BigTable data store, and a very clear migration path for putting your apps on AppEngine, should you choose to do so. It also makes your code a lot more portable since JPA, even a partial version of it, is still way more standard and portable than any proprietary API.


GigaSpaces JPA – Design Principles


When we initially thought of implementing a JPA interface to our data grid, we had the following goals in mind:



  • Make our user’s life easier by providing a standard way for the to interact with the data grid that will allow them to easily get up to speed with new applications, and port existing JEE applications more smoothly and quickly.
    For that, we created a thin layer on top of the excellent OpenJPA framework so that configuring a GigaSpaces EntityManagerFactory is a breeze if you’re already into JPA in general and OpenJPA in particular. We also managed to support a nice set of features from the JPA specification (actually more extensive than Google’s AppEngine JPA) so that most of the stuff users actually do with JPA is covered. Naturally, this is just our first take at this huge project, and we’ll add more and more capabilities as we move forward.
  • Enable our users to leverage the power and scalability of the data grid to scale their data access, even when they’re using JPA (which was originally designed for a centralized data model). This means exposing content based routing and data grid indexing to their JPA domain model, and abstracting away cluster wide transactions and queries.
  • Protect our users from making wrong implementation decisions when modeling and accessing their distributed data. In Google AppEngine’s terminology, this means supporting Owned Relationship only such that object graphs are not scattered across many nodes and are always limited to a single node.
  • Allow our users to use our powerful native APIs for functions that are not covered by the JPA specification, e.g. event handling, distributed task execution, and more. So it actually means that you can use the JPA API, while still operate on the same data model and entities via our native POJO based API. This is a very powerful concept that covers not only JPA but all of our data access APIs, we call Same Data, Any API and it means that you can operate on the same data using a variety of APIs – besides our native POJO API and JPA. For example, you can read a JPA entity as a document (in case you want to treat it in a more loosely flexible manner, or use our C++ client side API from native clients to read this data.

See for Yourself


We have just published an comprehensive tutorial that explains the principles of the our JPA implementation using the good old Spring PetClinic sample app (we’ve added our own flavor to it :) ). It explains the data model considerations and shows how take advantage of data grid features such as space based remoting to optimize the data access of the application and use our

Tuesday, December 7, 2010

TV Made Right :)

We've just created a new channel on YouTube called GigaSpaces TV. It will feature interviews and tech talks on various aspects of the product, and you also get to see me unshaved in the first episode ;). In it I give a short overview of the new APIs we're coming our with in our 8.0 release.
Stay tuned by subscribing to the channel!

Enjoy,
Uri





To Scale or Not to Scale - the Recording

The recording of my session at Devoxx 2010 is now available online at parleys.com
At this point you need a subscription to view it in full (which I highly recommend anyway since there's a lot of valuable content there), but it will be made fully public over coarse of the next few months.

Enjoy,
Uri

Friday, November 5, 2010

Yes, Sql!

See below the slide deck from my session today at QCon SF titled "Yes, Sql!" (hopefully you liked it if you attended :) ). It focuses on how the classic querying models like plain SQL and JPA map to distributed data stores. It first reviews the current distributed data stores landscape, and its querying models (K/V, Column, Document), and discusses the wide range of APIs for data extraction from these data stores. It then discusses the main challenges of mapping various APIs to a distributed data model as we've done over the past few months in GigaSpaces, and the trade offs to be aware off.

Tuesday, October 19, 2010

Our Citrix Integration Demo

This week we’re attending the Interop conference in New York to present our integrated solution with Citrix Netscaler and XenServer. The solution enables GigaSpaces XAP applications to utilize the Netscaler load balancer AND XenServer infrastructure to dynamically and automatically scale applications (e.g. a standard JEE web app) based on real time application load.

The following screen cast demonstrates how a standard JEE web application (in this case the Spring framework PetClinic demo application) is dynamically scaled on the Citrix SoftLayer cloud.

It runs on a number of virtual hosts which in turn run the Netscaler load balancer, the Web container and the MySql database.

The demo shows how the web application automatically scales out when the load increases. The scale out process includes the following stages:

  1. The system identifies that the average load has crossed a certain threshold.
  2. The system dynamically starts a new Xen virtual machine to host a new instance of the web application. This VM includes the GigaSpaces Agent component which enables XAP to dynamically start new JVM to host another web application instance.
  3. A new web application instance is provisioned to the newly started JVM
  4. The Netscaler load balancer configuration is automatically updated to reflect the new web container and route traffic to it
  5. The average load goes back to normal since the traffic is not evenly balanced across the old and new web container.

The demo also shows how the system automatically recovers from a forced virtual machine failure by re-instantiating the virtual machine and the GigaSpaces components on it, and then re-provisioning the missing web application instance onto it.

Enjoy,
Uri