Posts Tagged ‘Java’

Java Pipes vs. Sockets in a Single JVM

In a recent project I should have implemented an InputStream in a java code. There are many ways to achieve this and in this blog post I will show off the performance of just two of them, Sockets and Pipes ( PipedInputStream and PipedOutputStream ).

Scenario is really simple, I’ll send both 10 and 100 million messages from producer to consumer for each scenario. Message is just a sentence, “This is a message sent from Producer”.

I run all the scenario for 3 times and then get the average due to some os environment factor, other processes etc. And also I used the PrintStream as data writer for both test and BufferedReader for reader.

Buffer values can be set for Pipes by passing the long value to the PipedInputStream constructor. And for socket, buffer can be set from BufferedReader constructor. Buffer value for this test is 1024*1024*8 for both scenarios.


Pipe Socket
10M msg 12sec 26sec
100M msg 72sec 239sec

The main code of the test:

public class PipeAndSocketTester {

	public static void main(String args[]) throws IOException {
		boolean autoFlush = false;
		long messageNumber = 10000000;

		PipedOutputStream pos = new PipedOutputStream();
		PipedInputStream pis = new PipedInputStream(pos, 1024*1024*10);

		new PipeWriter(pos,messageNumber,autoFlush).start();
		new PipeReader(pis).start();

		// after above code executed I remove the pipe code then execute follows
		new SocketWriter(4544,messageNumber).start();
		new SocketReader(4544,1024*1024*10).start();

Categories: Java Tags: ,

Oracle Event Processing – Pattern Matching Example 2

In previous post I showed off one of the pattern matching feature of Oracle Event Processing and in this section I am gonna show another example of it.

There are many built-in functions which can be used in CQL. I think one of the most important these functions is the prev function. Prev function facilitates accessing the previous elements in the same stream/partition easily.

Consider this scenario, we have stock values as below and I would like to find out the pattern; firstly average value of previous 3 elements should be greater than 10 and the last value greater than the last value of the previous condition.

If we want to write to CQL code of our requirements it would look like:

CQL Example

With numbers lets try to understand how the pattern can be matched ( left side: incoming events; right side: expression about pattern matching):

Events and patterns


Run the application:


As shown in above, after the 13 and 33 pattern has been matched and output was produced.



Oracle Event Processing – Pattern Matching Example

Event processing technologies became famous in recent years. Companies realizes that they need to take real time action in order to satisfy customer requirements or handle several component issues happened in some internal system or advertising etc.

Oracle Event processing has several processing methods in order to process incoming real time data. In this blog post I will show off one of the pattern matching operations.

Consider, you desire to find out which stock firstly increases then decreases three times and increases again. For example;


In the above numbers we will find out the pattern which starts with 6.48 and ends with 6.47. Because it firstly incremented from 6.48 to 6.49 and then decreased three times ( 6.48,6.47,6.46 ) and then increased from 6.46 to 6.47 again.

I am using the Oracle Event Processing for Windows 64bit and Cygwin as windows terminal.

I changed a little bit the HelloWorld default cep application. The most crucial part is the CQL Processor part, how should I implement the CQL in order to find out the patterns?

OCEP Cql Processor Pattern Matching

OCEP Cql Processor Pattern Matching

Let’s look at the clauses in briefly.

StockProcessor is the name of the cql processor.
PARTITION BY shortName: Partition incoming data by shortName ( CQL Engines calculates each partition individually. For example, ORCL, IBM, GOOG prices are computed individually. )
MEASURES : Which values, fields will be in the output
PATTERN : Order of conditions ( Look at regular expression for more details )
DEFINE : Defining the conditions, “A as” means the define condition A.

Run the application and type the stock name and prices as in below:

OCEP Cql Processor Pattern Matching Demo

OCEP Cql Processor Pattern Matching Demo

Just after pattern is matched the output event is produced and in this example I just printed the name and last price value of the stock.

I didn’t specify any performance measurements but if the number of distinct stocks are increased performance will be our first concern. However, I would like to share some performance results in a another blog post later.

Coherence Index Performance

In a traditional database we use the index to access filtered data as quickly as possible. In the coherence, indexes are used for same purpose as well, fast access to filtered data. If you wish to retrieve all the items in the cache, likely index are not appropriate for you same as when you use database index. For example, when using the Oracle database and if you use a filter to retrieve data but its selectivity is close to 1, Oracle cost based optimizer can choose to not use the indexes because it’s not necessary since each index operations requires one more I/O. Likewise, in coherence when you wish the get data you should decide correctly whether use it or not.

Anyway, in this blog post I will carry out basic filter operations with and without index.

NamedCache cache = CacheFactory.getCache("persons");
PersonGenerator pg = new PersonGenerator();

System.out.println("Cache loaded:"+cache.size()+" elements.");

Filter filter = 
	new OrFilter(
		new AndFilter(
			new EqualsFilter("getFirstName", "Hugo"),
			new GreaterFilter("getAge", 50)
		new AndFilter(
			new EqualsFilter("getFirstName","William"),
			new LessFilter("getAge",50)

Firstly, I connected to the cluster and cache, after that 2 millions of people object is generated, they all put into the cache. Then, filter is created, pay a little bit more attention to here, uppermost the orfilter was used and two and filters was used in the filter. If we convert this filter clause to an sql query it would be :

select * from people where ( firstName="Hugo" and age>50 ) or ( ( firstName="William" and age<50 )

As it is seen here, we are just looking for the firstname and age fields for filtering. If these fields can be used in indexes we could improve the query performance.

To get filtered data we use the entrySet method by passing the Filter object as argument like this:

Set filteredData;
filteredData = cache.entrySet(filter);
System.out.println("Without Index:" + Timer.getDurationMSec());

It prints:

Without Index:12091

Time is milliseconds based.

As you know we don’t use index so far, it is the time to add one:

cache.addIndex(new ReflectionExtractor("getFirstName"), false, null);
	filteredData = cache.entrySet(filter);
System.out.println("With Index:" + Timer.getDurationMSec());

AddIndex method has been used to add index and first parameter of this method is the field’s get method which will be indexed. Second parameter gets a boolean variable and it determines the whether index is sorted or not. Sorted index is useful for range queries like ” age > 20 ” . While creating this index we did not use sorted index because we just looking for equalities, but we’ll use sorted index in the next one.
This code part prints:

With Index:5300

As you see, with one index, entrySet method double times faster than the previous one, without index.
We know that our filter uses two fields, one of them is the firstname which was just before indexed and the other one is the age. We can use age field as well in the index:

cache.addIndex(new ReflectionExtractor("getAge"), false, null);
	filteredData = cache.entrySet(filter);
System.out.println("With Two Indexes:" + Timer.getDurationMSec());

It prints:

With Two Indexes:1017

In order to get the filtered data set faster we can use the indexed object fields which are used in the filter. If we index the fields which are not used in the filter we won’t get any performance improvement.

cache.removeIndex(new ReflectionExtractor("getFirstName"));
cache.removeIndex(new ReflectionExtractor("getAge"));

cache.addIndex(new ReflectionExtractor("getCitizenNumber"), true, null);
filteredData = cache.entrySet(filter);
System.out.println("With Index BUT Not in the filter:" + Timer.getDurationMSec());

It prints:

With Index BUT Not in the filter:

11391 milliseconds is almost same duration of without index.

TROUG – Coherence Presentation and Demo Video

October 12, 2012 2 comments

Yesterday, I attended Turkish Oracle User Group ( TROUG ) event as a speaker in Bahcesehir University, Istanbul. I talked about cache concept, why do we need to cache and a few Oracle Coherence features. And also I mentioned Coherence Cache topologies such as Replicated Cache, Distributed Cache and Near Cache and also Write,Read Through, Write Behind Queue and Refresh-Ahead mechanisms.

It was delighted but the end of the my presentation I was disappointed with my VirtualBox. When I wanted to make a demo in my Eclipse which runs inside the OEL 5 in VirtualBOX 4, there was something wrong. Some of my Eclipse Launch Configuration have been disappeared in “Run As” section so there was lots of configuration and I lost all of them.

Fortunately, I decided to upload a video which tells the some of Oracle Coherence features and CohQL examples inside the VirtualBox.

You can watch this video in vimeo:

Here is the my presentation.

How a member can use the existing cache in Coherence

August 23, 2012 1 comment

In the previous post, I’ve explained how a member can join the existing cluster. Before read this post you should understand what was going on there.

At the final in previous post the new member joined the cluster. In this post I’ll explain how a member can use the existing cache ( customers cache which was created by the Eclipse project in this post )

Let’s go on from previous post.
Cluster Join
At the bottom of this screenshot you see that there is a prompt. To use existing cache ( this is the cache it has already created and it’s name is customers ) we type :

Map(?): cache customers

And it gives this output:

As you see the coherence was reading default cache configuration file instead of our cache config file. To force coherence to read our cache configuration file we should edit the JAVA_OPTS parameter in the file ( it was created in previous post ) and add this clause by changing the real path :

-Dtangosol.coherence.cacheconfig=/home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/src/config/coherence-cache-config.xml -Dtangosol.pof.config=/home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/src/config/pof-config.xml

These xml files are under the src/config folder of the Eclipse project which has already been created.
And also you should change the classpath parameters because in the pof-config.xml file there is a class that should be specified in the classpath to use pof configuration.
To edit classpath parameters, edit the file and go to the line which starts with JAVA_EXEC and change the -cp parameter as :

-cp "-classpath /home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/src/config:/labs/wls1211/coherence_3.7/lib/coherence.jar:/home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/bin" 

After editing file save this file as file. The file is here:

Run this sh file.
Then type “cache customers” again in the prompt then you’ll see this output:

As shown in above loaded cache configuration file and pof configuration file is as we wanted. When querying the size of the cache it returns the correct answer:

Map (customers): size

Map (customers): 

Joining the Existing Cluster in Coherence

August 21, 2012 3 comments

In previous post I had tried to explain some of the output parameters of Coherence. In this post I will explain how a member can join the existing cluster by setting some parameters.

Before this post you should completed this post:
Running a Coherence Application in Eclipse
We’ve already run this example within Eclipse. Basically it creates a cluster and a cache then put a custom object into the cache. And because of code is over in Coherence Cache automatically closed. To prevent closing automatically we’ll edit the and add the below code to control the application flow.

String dummy;
Scanner user_input = new Scanner( );
dummy = );

And the changed version of is now here:

package com.afsungur.coherence.test;

import java.util.Scanner;

import com.afsungur.coherence.objects.Customer;

public class CustomerTest1 {

        public static void main(String args[])
        Customer cust = new Customer("Ahmet Fuat", "Sungur", "Bahcelievler/Istanbul", "", "5321231231", 34111);
        NamedCache cache = CacheFactory.getCache("customers");
        cache.put(1, cust) ;
        System.out.println("Cache Size:"+cache.size());

        String dummy;
        Scanner user_input = new Scanner( );
        dummy = );



After editing you can run the application inside Eclipse and then you see that application will wait for some input to terminate the cache server. But before terminating we may allow another member to join the cluster.

By the way you can check which parameters were used to start cluster inside the Eclipse as shown in below :

[oracle@localhost bin]$ ps -ef | grep coherence
oracle    6835  2873  8 13:18 ?        00:00:02 /labs/wls1211/jdk160_29/bin/java -Dtangosol.coherence.cacheconfig=src/config/coherence-cache-config.xml -Dtangosol.coherence.cluster=MyFirstCluster -Dtangosol.pof.config=/home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/src/config/pof-config.xml -Dtangosol.coherence.clusterport=7252 -Dfile.encoding=UTF-8 -classpath /home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/src/config:/labs/wls1211/coherence_3.7/lib/coherence.jar:/home/oracle/labs/Coh_labs/workspace/CohExam_AFSungur/bin com.afsungur.coherence.test.CustomerTest1
oracle    6874  5387  0 13:19 pts/1    00:00:00 grep coherence
[oracle@localhost bin]$ 

And so, how we can add a member to cluster? We know that there is a cluster which serves from 7252 port and also we now the cluster name ( MyFirstCluster ) and so on. We can check this parameters from Run->Run Configurations->Coherence(Main Tab)->Other(Sub Tab).
And also you can check as shown in above by using “ps” linux command.

Under the bin folder of coherence main folder ( something like coherence3.7/bin ), there are some sh files. and can be used to join cluster. provides an interactive environment to query the cache for putting, getting, listing or something else. Therefore, we use to join cluster.

Just before running the file we edit file and add below arguments to the line which starts with “JAVA_OPTS=” and then save as the file .

-Dtangosol.coherence.cluster=MyFirstCluster -Dtangosol.coherence.clusterport=7252

Then, start the and the analyse the output:
Cluster Join

Most part of output is similar to single member cluster. But there are a few differences.
First of all, at the top of output you see that new member are using the default operational configuration file which is inside the jar file.
And the middle of the output at the MasterMemberSet section you can see that currently how many members are there in the cluster, what is the current member id (ThisMember part), oldest member id and so on.
At the bottom of page you can see the interactive command line and “Map (?):” is written at the left side. Type “help” and then you can see all available commands.

In this post I’ve explained how a member join the existing cluster. In next post I’ll explain how just joined member can use the existing cache.