The usability of WSO2 CEP comes from its ability to interface with multiple different systems so that other enterprise systems can publish events to CEP using a protocol or transport of their choice. In this post, we are exploring some common scenarios where WSO2 CEP integrates with other products to deliver analytics in Real-time.
Publishing events from ESB
There are multiple ways to publish events from WSO2 ESB to CEP. One of such methods is to use the Publish Event mediator which was introduced from WSO2 ESB 4.9.0 onwards [1]. Alternatively, you can use the BAM mediator as well [2] if you are using an older version of ESB.For configuring this scenario, let us first start an ESB server as well as a CEP server. You can start both servers in the same machine by giving an offset to one of the servers. Configure the CEP server to run with an offset of 1 by setting the offset value in <CEP_HOME>/repository/conf/carbon.xml. Start both ESB and CEP servers.
When using the publish event mediator, first of all create a stream and a corresponding event receiver from CEP. You can create a stream by logging to the management console of CEP (which should be accessible at https://localhost:9444/carbon if the offset of 1 was set for CEP server) and going to the Streams section and adding a stream. The stream details should be as follows.
Input Stream definition |
<?xml version="1.0" encoding="UTF-8"?> <eventReceiver name="WSO2EventReceiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <from eventAdapterType="wso2event"> <property name="events.duplicated.in.cluster">false</property> </from> <mapping customMapping="disable" type="wso2event"/> <to streamName="StockRequestInfo" version="1.0.0"/> </eventReceiver>
Now login to the ESB management console (at https://localhost:9443/carbon) and create a pass through proxy service with the sample SimpleStockQuoteService as the backend service [3]. Then, add an Event Sink to the ESB that corresponds to the thrift endpoints of the CEP server by going to the Configure tab of the management console and going to Event Sinks. By default the thrift port and the authentication port will be configured as 7611 and 7711 respectively. However, since CEP server was started with an offset, those ports will be opened on 7612 and 7712 from the CEP server. Please take note of the URL format given below.
Event sink configuration at ESB |
Publish Event Mediator configuration |
<?xml version="1.0" encoding="UTF-8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="StockQuoteProxy" transports="https,http" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <publishEvent> <eventSink>CEPEventSink</eventSink> <streamName>StockRequestInfo</streamName> <streamVersion>1.0.0</streamVersion> <attributes> <meta> <attribute name="httpMethod" type="STRING" defaultValue="" expression="get-property('axis2', 'HTTP_METHOD')"/> <attribute name="destination" type="STRING" defaultValue="" expression="get-property('To')"/> </meta> <correlation/> <payload> <attribute xmlns:m0="http://services.samples" name="symbol" type="STRING" defaultValue="" expression="$body/m0:getQuote/m0:request/m0:symbol"/> </payload> <arbitrary/> </attributes> </publishEvent> </inSequence> <outSequence> <send/> </outSequence> <endpoint> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> </target> <description/> </proxy>
Now let's write a simple query to group requests for a particular stock symbol and get a count of the requests received within the last minute. Based on the count, if the count is greater than 3, we will send this information to a JMS topic. We will focus on the query first and later see how CEP can be integrated with ActiveMQ as the JMS broker.
Writing a simple Siddhi Query
You can simply write a query by adding an execution plan, importing the StockRequestInfo stream, defining the query logic in Siddhi and exporting the resultant HighRequestCount stream. The final execution plan should be something like below. The siddhi query is highlighted in bold. /* Enter a unique ExecutionPlan */
@Plan:name('RequestCountAnalyzer')
/* Enter a unique description for ExecutionPlan */
@Plan:description('This Execution Plan counts requests submitted via POST method for the last minute and outputs an event if the count > 3 for a particular symbol')
/* define streams/tables and write queries here ... */
@Import('StockRequestInfo:1.0.0')
define stream StockRequestInfo (meta_httpMethod string, meta_destination string, symbol string);
@Export('HighRequestCount:1.0.0')
define stream HighRequestCount (meta_timestamp long, symbol string, requestCount long);
from StockRequestInfo[meta_httpMethod == 'POST']#window.timeBatch(1 min)
select time:timestampInMilliseconds() as meta_timestamp, symbol, count(symbol) as requestCount
group by symbol having requestCount > 3
insert into HighRequestCount;
Integrating with ActiveMQ
The next part is configuring an event publisher to publish the alert event to ActiveMQ. Complete details on how to configure CEP to publish events to ActiveMQ is available at [5]. We will go through the configuration steps for ActiveMQ 5.9.0 in here briefly as well.1. Shutdown the CEP server
2. Copy the following jars from
- geronimo-j2ee-management_1.1_spec-1.0.1.jar
- activemq-client-5.9.0.jar
4. Start the CEP server
5. Configure a new event publisher as below.
<?xml version="1.0" encoding="UTF-8"?> <eventPublisher name="ActiveMQPublisher" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> <from streamName="HighRequestCount" version="1.0.0"/> <mapping customMapping="disable" type="text"/> <to eventAdapterType="jms"> <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property> <property name="java.naming.provider.url">tcp://localhost:61616</property> <property name="transport.jms.UserName">admin</property> <property encrypted="true" name="transport.jms.Password">kuv2MubUUveMyv6GeHrXr9il59ajJIqUI4eoYHcgGKf/BBFOWn96NTjJQI+wYbWjKW6r79S7L7ZzgYeWx7DlGbff5X3pBN2Gh9yV0BHP1E93QtFqR7uTWi141Tr7V7ZwScwNqJbiNoV+vyLbsqKJE7T3nP8Ih9Y6omygbcLcHzg=</property> <property name="transport.jms.DestinationType">queue</property> <property name="transport.jms.Destination">HighRequestAlert</property> <property name="transport.jms.ConcurrentPublishers">allow</property> <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property> </to> </eventPublisher>
Now the flow from CEP should be complete. If you browse the Event flow from CEP management console, you should see a flow as below.
CEP Event flow for this scenario |
Testing the integrated event flow
Now for the fun part! You have to test the flow to see whether your integration scenario works properly. Before you send test events, make sure the following servers are up and running in your environment.- WSO2 ESB server
- Backend sample Axis2 server with the SimpleStockQuoteService deployed
- WSO2 CEP server
- ActiveMQ JMS broker
ant stockquote -Dtrpurl=http://localhost:8280/services/StockQuoteProxy -Dsymbol=WSO2
If the integration is done correctly, you should see enqueued messages for the HighRequestAlert topic in ActiveMQ web console.
Final alert shown at ActiveMQ web console |
Troubleshooting
If you run into any configuration issues or is somehow unable to get the above scenario up and running, the best tools that can help you troubleshoot the issue is the Event Tracing component [6] and the Logger Publisher [7]. Enable event tracing for a particular component to see whether the event was processed by that particular component/module of the CEP. Alternatively, add a logger publisher to a particular stream to see whether that event was received by that stream.
Overall Integration Scenario
The block diagram for the above overall integration scenario is given below. WSO2 CEP and the underlying Siddhi engine has always been known for its high performance [8] where the true power of the product lies. With CEP 4.0.0, we have made the integration effort also that much smoother, with support for multiple new types of adapters as well, so other enterprise systems can integrate with minimum configuration and hassle. With the new improvements in CEP 4.0.0, for your enterprise systems, the power of our CEP engine will be readily available, literally at your fingertips!
Overall Integration Scenario |
[3] https://docs.wso2.com/display/ESB480/ESB+Samples+Setup#ESBSamplesSetup-StartingSampleBack-EndServices
[4] https://docs.wso2.com/display/ESB490/Publish+Event+Mediator
[5] https://docs.wso2.com/display/CEP400/ActiveMQ+JMS+Event+Publisher
[6] https://docs.wso2.com/display/CEP400/Event+Tracer
[7] https://docs.wso2.com/display/CEP400/Logger+Event+Publisher
[8] http://srinathsview.blogspot.com/2013/08/cep-performance-processing-100k-to.html
Titanium Daith jewelry
ReplyDeleteThe most black titanium fallout 76 detailed and accurate titanium belt buckle ceramic jewelry product galaxy watch 3 titanium found oakley titanium glasses at TITanium-Arts.com. Browse our selection of gold jewelry for Men, Women, $1.00 · In mens titanium necklace stock