Friday, August 2, 2013

WSO2 Complex Event Processor - Siddhi Query Partitioning

Siddhi is the event processing engine that is used by WSO2 Complex Event Processor. From the inception itself, Siddhi focused on delivering high performance and throughput. But so far, it was lacking support for partitioning of queries that would yield better performance.

With Siddhi 2.0.0, that has been rectified, along with a lot of other features as well. With query partitioning, you are able to break up your query into multiple partitions based on the value of an attribute or a pre-defined range. This would yield improved performance since work would happen on local datasets and the query would have a lesser working set.

An Overview

For query partitioning, all you need to do is define a partition using the Siddhi API or Siddhi Query Language. If you are using the Siddhi query language, the syntax is as follows.

define partition <partition-name> by [<stream-id>.]<attribute-name> , ... , range <condition> as <label>, ....

For example,

define partition stockSymbol by stockStream.symbol;

or

define partition stockVolume by range volume < 10 as 'SMALL',  range volume > 10 && volume < 100 as 'MEDIUM', range volume > 100 as 'LARGE';

When you define your query, simply say 'partition by <partition-name>' to do partitioning for that query.

e.g.

from stockStream [volume >= 100]#window.time(10 min)
select symbol, avg(price) as avg_price, sum(volume) as total_volume
insert into recentLargeTransactionStats
partition by stockSymbol;

This would effectively create a separate partition for each symbol in the stockStream that matches this query. If events came in the order of
{WSO2, 58.50, 150},{AMBA, 42.01, 450}, {FB, 98.73, 1000}, {WSO2, 58.32, 2000}, {LNK, 33.45, 250}, {AMD, 40.51, 50}

Then partitions will be created for WSO2, AMBA,FB and LNK. Each partition runs its own copy of the query. For the output stream, the results from each partition will be merged back together. The flow for the above query is summarized in the diagram below.

Under the hood

Siddhi breaks up partitions by first identifying if it is a RangePartition or a VariablePartition. For variable partitioning, as in the above example, a separate partition will be created for each different variable value. Please take note not to include constantly changing variables that belong to a large set as partition variables. Variable partitioning approach is ideal for usecases where there is a limited number of discrete values for the variable with the same values repeating frequently such as stock symbols in a stock quote stream. The partition will be created dynamically as a new value for a particular variable is encountered for the first time. In the case of ranges, this will happen when a new value falling to a particular range is encountered the first time. Each partition will have its own query processor.  

Given below is a very basic example code.

How to setup a simple sample for playing with partitioning

To run the sample code given, you only need siddhi-core as a dependency. You can download Siddhi source code from the following location and build it using maven (version 3 or above).

https://svn.wso2.org/repos/wso2/carbon/platform/trunk/dependencies/commons/siddhi/2.0.0-wso2v4/

For a maven module, the pom.xml should be like this.
Loading ....

This is the java code.

Loading ....

Tweak stuff, play around and if you need to extend further functionality, the source code is available in the location given above.

No comments:

Post a Comment