Thursday, October 1, 2015

Integrating WSO2 CEP with Enterprise Systems

With the release of WSO2 Complex Event Processor 4.0.0, the power and functionality that can be leveraged from WSO2 Complex Event Processor is as never before. There is a horde of new features, among which some of the key features are distributed query processing with Apache Storm, creating pre-defined execution plan templates along with the new Siddhi-3.0.0 engine that has been re-architected to deliver even better performance than before for multi-threaded and distributed environments.

The usability of WSO2 CEP comes from its ability to interface with multiple different systems so that other enterprise systems can publish events to CEP using a protocol or transport of their choice. In this post, we are exploring some common scenarios where WSO2 CEP integrates with other products to deliver analytics in Real-time.

Publishing events from ESB

There are multiple ways to publish events from WSO2 ESB to CEP. One of such methods is to use the Publish Event mediator which was introduced from WSO2 ESB 4.9.0 onwards [1]. Alternatively, you can use the BAM mediator as well [2] if you are using an older version of ESB.

For configuring this scenario, let us first start an ESB server as well as a CEP server. You can start both servers in the same machine by giving an offset to one of the servers. Configure the CEP server to run with an offset of 1 by setting the offset value in <CEP_HOME>/repository/conf/carbon.xml. Start both ESB and CEP servers.

When using the publish event mediator, first of all create a stream and a corresponding event receiver from CEP. You can create a stream by logging to the management console of CEP (which should be accessible at https://localhost:9444/carbon if the offset of 1 was set for CEP server) and going to the Streams section and adding a stream. The stream details should be as follows.
Input Stream definition
Create a corresponding event receiver which should have configuration similar to below.

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="WSO2EventReceiver" statistics="disable"
    trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="wso2event">
        <property name="events.duplicated.in.cluster">false</property>
    </from>
    <mapping customMapping="disable" type="wso2event"/>
    <to streamName="StockRequestInfo" version="1.0.0"/>
</eventReceiver>

Now login to the ESB management console (at https://localhost:9443/carbon) and create a pass through proxy service with the sample SimpleStockQuoteService as the backend service [3]. Then, add an Event Sink to the ESB that corresponds to the thrift endpoints of the CEP server by going to the Configure tab of the management console and going to Event Sinks. By default the thrift port and the authentication port will be configured as 7611 and 7711 respectively. However, since CEP server was started with an offset, those ports will be opened on 7612 and 7712 from the CEP server. Please take note of the URL format given below.

Event sink configuration at ESB
After the event sink is added correctly, edit the already created proxy service in design view and define a new insequence and add a Publish Event mediator to the proxy service as below [4].

Publish Event Mediator configuration
The completed proxy service configuration would be similar to the configuration listed here.

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="StockQuoteProxy"
       transports="https,http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <publishEvent>
            <eventSink>CEPEventSink</eventSink>
            <streamName>StockRequestInfo</streamName>
            <streamVersion>1.0.0</streamVersion>
            <attributes>
               <meta>
                  <attribute name="httpMethod"
                             type="STRING"
                             defaultValue=""
                             expression="get-property('axis2', 'HTTP_METHOD')"/>
                  <attribute name="destination"
                             type="STRING"
                             defaultValue=""
                             expression="get-property('To')"/>
               </meta>
               <correlation/>
               <payload>
                  <attribute xmlns:m0="http://services.samples"
                             name="symbol"
                             type="STRING"
                             defaultValue=""
                             expression="$body/m0:getQuote/m0:request/m0:symbol"/>
               </payload>
               <arbitrary/>
            </attributes>
         </publishEvent>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
      <endpoint>
         <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
      </endpoint>
   </target>
   <description/>
</proxy>

Now let's write a simple query to group requests for a particular stock symbol and get a count of the requests received within the last minute. Based on the count, if the count is greater than 3, we will send this information to a JMS topic. We will focus on the query first and later see how CEP can be integrated with ActiveMQ as the JMS broker.

Writing a simple Siddhi Query

You can simply write a query by adding an execution plan, importing the StockRequestInfo stream, defining the query logic in Siddhi and exporting the resultant HighRequestCount stream. The final execution plan should be something like below. The siddhi query is highlighted in bold.

 /* Enter a unique ExecutionPlan */  
 @Plan:name('RequestCountAnalyzer')  
   
 /* Enter a unique description for ExecutionPlan */  
 @Plan:description('This Execution Plan counts requests submitted via POST method for the last minute and outputs an event if the count > 3 for a particular symbol')  
   
 /* define streams/tables and write queries here ... */  
   
 @Import('StockRequestInfo:1.0.0')  
 define stream StockRequestInfo (meta_httpMethod string, meta_destination string, symbol string);  
   
 @Export('HighRequestCount:1.0.0')  
 define stream HighRequestCount (meta_timestamp long, symbol string, requestCount long);  
   
 from StockRequestInfo[meta_httpMethod == 'POST']#window.timeBatch(1 min)  
 select time:timestampInMilliseconds() as meta_timestamp, symbol, count(symbol) as requestCount  
 group by symbol having requestCount > 3  
 insert into HighRequestCount;  

Integrating with ActiveMQ

The next part is configuring an event publisher to publish the alert event to ActiveMQ. Complete details on how to configure CEP to publish events to ActiveMQ is available at [5]. We will go through the configuration steps for ActiveMQ 5.9.0 in here briefly as well.

1. Shutdown the CEP server
2. Copy the following jars from /lib directory to /repository/components/lib directory
  • geronimo-j2ee-management_1.1_spec-1.0.1.jar
  • activemq-client-5.9.0.jar
3. Start the ActiveMQ server. (You can run it foreground mode by running <ACTIVEMQ_HOME>/bin/activemq console command)
4. Start the CEP server
5. Configure a new event publisher as below.

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="ActiveMQPublisher" statistics="disable"
  trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
  <from streamName="HighRequestCount" version="1.0.0"/>
  <mapping customMapping="disable" type="text"/>
  <to eventAdapterType="jms">
    <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
    <property name="java.naming.provider.url">tcp://localhost:61616</property>
    <property name="transport.jms.UserName">admin</property>
    <property encrypted="true" name="transport.jms.Password">kuv2MubUUveMyv6GeHrXr9il59ajJIqUI4eoYHcgGKf/BBFOWn96NTjJQI+wYbWjKW6r79S7L7ZzgYeWx7DlGbff5X3pBN2Gh9yV0BHP1E93QtFqR7uTWi141Tr7V7ZwScwNqJbiNoV+vyLbsqKJE7T3nP8Ih9Y6omygbcLcHzg=</property>
    <property name="transport.jms.DestinationType">queue</property>
    <property name="transport.jms.Destination">HighRequestAlert</property>
    <property name="transport.jms.ConcurrentPublishers">allow</property>
    <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  </to>
</eventPublisher>
                                 
Now the flow from CEP should be complete. If you browse the Event flow from CEP management console, you should see a flow as below.
CEP Event flow for this scenario

Testing the integrated event flow

Now for the fun part! You have to test the flow to see whether your integration scenario works properly. Before you send test events, make sure the following servers are up and running in your environment.
  1. WSO2 ESB server
  2. Backend sample Axis2 server with the SimpleStockQuoteService deployed
  3. WSO2 CEP server
  4. ActiveMQ JMS broker
For the next step, simply send about 4 or 5 quick messages to the ESB proxy service requesting quotes for a particular symbol within 1 minute. You can send events to the proxy service hosted at ESB by making use of the already existing sample clients. Go to <ESB_HOME>/samples/axis2Client directory and run the following ant command to send stock quote requests.

ant stockquote -Dtrpurl=http://localhost:8280/services/StockQuoteProxy -Dsymbol=WSO2

If the integration is done correctly, you should see enqueued messages for the HighRequestAlert topic in ActiveMQ web console.
Final alert shown at ActiveMQ web console

Troubleshooting 

If you run into any configuration issues or is somehow unable to get the above scenario up and running, the best tools that can help you troubleshoot the issue is the Event Tracing component [6] and the Logger Publisher [7]. Enable event tracing for a particular component to see whether the event was processed by that particular component/module of the CEP. Alternatively, add a logger publisher to a particular stream to see whether that event was received by that stream.

Overall Integration Scenario

The block diagram for the above overall integration scenario is given below. WSO2 CEP and the underlying Siddhi engine has always been known for its high performance [8] where the true power of the product lies. With CEP 4.0.0, we have made the integration effort also that much smoother, with support for multiple new types of adapters as well, so other enterprise systems can integrate with minimum configuration and hassle. With the new improvements in CEP 4.0.0, for your enterprise systems, the power of our CEP engine will be readily available, literally at your fingertips!
[2] https://docs.wso2.com/display/ESB490/BAM+Mediator
[3] https://docs.wso2.com/display/ESB480/ESB+Samples+Setup#ESBSamplesSetup-StartingSampleBack-EndServices
[4] https://docs.wso2.com/display/ESB490/Publish+Event+Mediator
[5] https://docs.wso2.com/display/CEP400/ActiveMQ+JMS+Event+Publisher
[6] https://docs.wso2.com/display/CEP400/Event+Tracer
[7] https://docs.wso2.com/display/CEP400/Logger+Event+Publisher
[8] http://srinathsview.blogspot.com/2013/08/cep-performance-processing-100k-to.html


1 comment:

  1. Titanium Daith jewelry
    The most black titanium fallout 76 detailed and accurate titanium belt buckle ceramic jewelry product galaxy watch 3 titanium found oakley titanium glasses at TITanium-Arts.com. Browse our selection of gold jewelry for Men, Women, $1.00 · ‎In mens titanium necklace stock

    ReplyDelete