Java DelayQueue test

DelayQueue Test

DelayQueue is a Java Queue that supports delaying for queue elements. An element inside the queue only taken after its expiration(delay) time.

Object class that inside the queue should implement the “Delayed” interface. This interface forces you to implement 2 methods:

getDelay,compareTo

getDelay method is important because Java decided to dequeue element from queue if getDelayed<=0 . For more information, visit Delayed Interface javadoc.

Lets start an example.

Firstly, you should define a class that represents the elements inside the queue. Our class name is NewString. This class has 2 important methods,getDelay and compareTo. If getDelay method returns 0 value or a value is lower than 0, this object will dequeue from the queue.

package com.turkcelltech.queuetests.DelayQueue;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class NewString implements Delayed{
private long endOfDelay;
private String text;
private long queueInsertTime;

public long getQueueInsertTime() {
return queueInsertTime;
}

public void setQueueInsertTime(long queueInsertTime) {
this.queueInsertTime = queueInsertTime;
}

public String getText() {
return text;
}

public void setText(String text) {
this.text = text;
}

public NewString()
{
}

public long getEndOfDelay() {
return endOfDelay;
}
public void setEndOfDelay(long endOfDelay) {
this.endOfDelay = endOfDelay;
}

@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
long tmp = unit.convert((getQueueInsertTime()-System.currentTimeMillis())+endOfDelay, TimeUnit.MILLISECONDS);
return tmp;

}

@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
int ret = 0;
NewString ns = (NewString) o;

if ( this.endOfDelay < ns.endOfDelay ) ret = -1;
else if ( this.endOfDelay > ns.endOfDelay ) ret = 1;
else if ( this.getQueueInsertTime() == ns.getQueueInsertTime() ) ret = 0;

return ret;

}

}

And we are preparing Producer and Consumer classes. For each step in producer, a NewString object is creating and setting its attributes; for each step in consumer, consumer tries to take an element from the queue, if it could not, it waits until an element will be put (Refer the java queue documentation to understand differences between poll and take methods of queue, http://download.oracle.com/javase/1.5.0/docs/api/java/util/Queue.html )

Producer:

package com.turkcelltech.queuetests.DelayQueue;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.Random;
import java.util.concurrent.DelayQueue; 

public class Producer extends Thread {

private DelayQueue dq;
private SecureRandom random = new SecureRandom();

public Producer(DelayQueue dq) {
// TODO Auto-generated constructor stub
this.dq = dq;
}

@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
String str = getRandomString();
NewString nstr = new NewString();
int iRandom = 2000 + new Random().nextInt(1000);

nstr.setText(str);
nstr.setEndOfDelay(iRandom);
nstr.setQueueInsertTime(System.currentTimeMillis());
dq.put(nstr);

Thread.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public String getRandomString() {
return new BigInteger(130, random).toString(32);
}

}

Consumer:

package com.turkcelltech.queuetests.DelayQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue; 

public class Consumer extends Thread {

private DelayQueue dq;
private NewString ns;

public Consumer(DelayQueue dq) {
// TODO Auto-generated constructor stub
this.dq = dq;
}

@Override
public void run()
{
long dequeueTime = 0;
while ( true )
{
String str = null;
try {
ns = (NewString) dq.take();
dequeueTime = System.currentTimeMillis();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
StringBuffer sb = new StringBuffer();
sb.append(“——————–”+”\n”);
sb.append(“Queue Size ( Cons ) :”+dq.size()+”\n”);
sb.append(“Inserted Element :”+ns.getText()+”\n”);
sb.append(“Queue Insertion Time :”+ns.getQueueInsertTime()+”\n”);
sb.append(“Now ( dequeue time ) :”+dequeueTime+”\n”);
sb.append(“Expected Delay (ms):”+ns.getEndOfDelay()+”\n”);
sb.append(“Actual Delay (ms):”+(dequeueTime-ns.getQueueInsertTime())+”\n”);
sb.append(“Differences Actual and Expected Delay (ms):”+((dequeueTime-ns.getQueueInsertTime())-ns.getEndOfDelay())+”\n”);
sb.append(“——————–”+”\n”);
System.out.println(sb.toString());
//System.out.println(“Queue Size (Cons):”+dq.size()+”|Inserted Element:”+ns.getText()+”|Delay:”+ns.getEndOfDelay()+”|Queue Insertion Time:”+ns.getQueueInsertTime()+”|Now:”+System.currentTimeMillis());

}
}

}

And, create the main class that calls producer and consumer:

DelayQueueTest:

package com.turkcelltech.queuetests.DelayQueue;

import java.util.concurrent.DelayQueue;

public class DelayQueueTest {

private static DelayQueue dq;

public static void main(String args[])
{
System.out.println(“DelayQueue Example…”);
dq = new DelayQueue();
Producer p = new Producer(dq);
Consumer c = new Consumer(dq);
p.start();
c.start();

}
}

Lastly, run the DelayQueueTest.java, then lets analyse the output. After a
period of time, to find the elements that has expired become hard because the
queue size is increasing gradually because of in our example, producer is
faster than the consumer. I think this delay queing structure is great, useful and works well with middle-size queues but if the
queue size is big enough, deciding expired items much more hard.

——————–
Queue Size ( Cons ) :6190
Inserted Element :cqf44unuj7e54gm6ou8hafgt6b
Queue Insertion Time :1297351067151
Now ( dequeue time ) :1297351069152
Expected Delay (ms):2000
Actual Delay (ms):2001
Differences Actual and Expected Delay (ms):1
——————–

——————–
Queue Size ( Cons ) :6190
Inserted Element :q5a0atgilmik2jd6d3qu2bl2ic
Queue Insertion Time :1297351066479
Now ( dequeue time ) :1297351069152
Expected Delay (ms):2000
Actual Delay (ms):2673
Differences Actual and Expected Delay (ms):673
——————–

——————–
Queue Size ( Cons ) :6189
Inserted Element :o plb0ojmltu473jgbstbsdcauj
Queue Insertion Time :1297351065869
Now ( dequeue time ) :1297351069152
Expected Delay (ms):2001
Actual Delay (ms):3283
Differences Actual and Expected Delay (ms):1282
——————–

——————–
Queue Size ( Cons ) :6188
Inserted Element :la7uka3abq4t8ilbbki386dc2p
Queue Insertion Time :1297351065891
Now ( dequeue time ) :1297351069152
Expected Delay (ms):2001
Actual Delay (ms):3261
Differences Actual and Expected Delay (ms):1260
——————–

——————–
Queue Size ( Cons ) :6187
Inserted Element :35uesr3s7c8dbkrtfudjvg2tir
Queue Insertion Time :1297351066946
Now ( dequeue time ) :1297351069152
Expected Delay (ms):2001
Actual Delay (ms):2206
Differences Actual and Expected Delay (ms):205
——————–

——————–
Queue Size ( Cons ) :6186
Inserted Element :adcufi915fr0ehs43ebkl66apl
Queue Insertion Time :1297351063883
Now ( dequeue time ) :1297351069152
Expected Delay (ms):2002
Actual Delay (ms):5269
Differences Actual and Expected Delay (ms):3267
——————–

——————–
Queue Size ( Cons ) :8227
Inserted Element :sevfljifbd677h178l6t4225pp
Queue Insertion Time :1297351069243
Now ( dequeue time ) :1297351071244
Expected Delay (ms):2000
Actual Delay (ms):2001
Differences Actual and Expected Delay (ms):1
——————–

——————–
Queue Size ( Cons ) :8325
Inserted Element :unrvsodhafh02bcq21ords5ude
Queue Insertion Time :1297351069341
Now ( dequeue time ) :1297351071343
Expected Delay (ms):2001
Actual Delay (ms):2002
Differences Actual and Expected Delay (ms):1
……
……
……
Queue Size ( Cons ) :507047
Inserted Element :e4gjtvfq6b8hnlau27hlldvpve
Queue Insertion Time :1297350198016
Now ( dequeue time ) :1297350301063
Expected Delay (ms):2003
Actual Delay (ms):103047
Differences Actual and Expected Delay (ms):101044
——————–

——————–
Queue Size ( Cons ) :507046
Inserted Element :p8fe9d49o1ga0i9oe3ciepac37
Queue Insertion Time :1297350200057
Now ( dequeue time ) :1297350301063
Expected Delay (ms):2003
Actual Delay (ms):101006
Differences Actual and Expected Delay (ms):99003
——————–

——————–
Queue Size ( Cons ) :507045
Inserted Element :n5bd4saorqcmo3ahgfersfg4ns
Queue Insertion Time :1297350187489
Now ( dequeue time ) :1297350301063
Expected Delay (ms):2003
Actual Delay (ms):113574
Differences Actual and Expected Delay (ms):111571
——————–

——————–
Queue Size ( Cons ) :507044
Inserted Element :2522nr8jkk56qr3qbm06ie8er
Queue Insertion Time :1297350203764
Now ( dequeue time ) :1297350301063
Expected Delay (ms):2003
Actual Delay (ms):97299
Differences Actual and Expected Delay (ms):95296

Oracle CEP Bug – org.springframework.beans.TypeMismatchException

I have installed the Oracle Complex Event Processing 11.1.3. I will have described what I am doing by the end of this month. Anyway I decided to use Event
Partitioning feature to achieve scalability. When I use a channel with event partitioning feature I was getting error :

org.springframework.beans.factory.BeanCreationException: Error creating bean with name

‘inputChannel’: Invocation of init method failed; nested exception is

org.springframework.beans.PropertyBatchUpdateException; nested PropertyAccessExceptions (1) are:
PropertyAccessException 1: org.springframework.beans.TypeMismatchException: Failed to convert

property value of type [java.lang.String] to required type

[com.bea.wlevs.channel.EventPartitioner] for property ‘eventPartitioner’; nested exception is

java.lang.IllegalArgumentException: Cannot convert value of type [java.lang.String] to required

type [com.bea.wlevs.channel.EventPartitioner] for property ‘eventPartitioner’: no matching

editors or conversion strategy found

In CEP documentation,
( Section 19.1, “How to Configure Scalability With an Event Partitioner Channel” )

it says we could achieve scalability by putting this piece of code into channel configuration part of EPN assembly file:

<wlevs:instance-property name=”eventPartitioner” value=”true” />

However, it does not working. I created a Metalink SR and they answered me as they filed a bug about this issue(9964253).

Getting rid of this error is simple. Change name of instance-property from “eventPartitioner” to “partitionByEventProperty” and change value parameter to a varible name of your Event Class like this:

<wlevs:instance-property name=”partitionByEventProperty” value=”symbol” />

Your event class should have a “symbol” variable.

public String symbol;

jSch — SSH Api for Java Applications — Ssh Port Forwarding in Java

If you write a java network application, for some security reasons you need to make an ssh connection to remote host. To make an ssh connection you need some parameters. Host address, user name and password are some important of them. To pass typing password you can use “send / expect” application or you can use jSch api that i am going to tell you.

SshHandler class main class of this api, it is like interface that provides method to pass user name and password. And Awaker class is my application class that reads file that includes connection information for each line.

Example line is:

nxxast01;turkcell;xyz;110.15.122.20;9911;127.0.0.1;9999

First piece is name of machine, second is username, third is password, fourth is remote host address, fifth is local port, sixth is where remote host will be directed and seventh is which port that remote host forward. I mentioned this to understand code ( by the way i am not a kind of java expert :) )

It means that machine 110.15.122.20 connects to 127.0.0.1:9999 then forward data to localhost:9911
( SSH Port Forwarding )

SshHandler Class :

/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/

package nor;

import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.UserInfo;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
*
* @author TTASUNGUR
*/
public class SshHandler {
JSch jsch=new JSch();

public void openSshConnection(String ps_user,String ps_password, String ps_host, String ps_localport, String ps_remotehost, String ps_remoteport ) throws SshConnectionException
{
try
{
System.out.println(Global.info_prefix+”It will be trying to connect remote host : Username : “+ps_user+”, Password : , Remote Host : “+ps_host+”, Localport : “+ps_localport+”, Remote : “+ps_remotehost+”, RemotePort : “+ps_remoteport);
Session session= jsch.getSession(ps_user, ps_host, Global.sshport);
UserInfo ui = new MyUserInfo(ps_password);
session.setUserInfo(ui);
session.connect();
int assigned_port=session.setPortForwardingL(Integer.parseInt(ps_localport), ps_remotehost, Integer.parseInt(ps_remoteport));
System.out.println(Global.info_prefix+”Ssh connection is established for :” + ps_host +”. Localport:”+assigned_port+”:”+ps_remotehost+”:”+ps_remoteport);
}
catch (JSchException ex)
{
System.err.println(Global.error_prefix+”Ssh connection could not be established for :”+ps_host+”, due to :”+ex.getMessage());
ex.printStackTrace();
throw new SshConnectionException(Global.error_prefix+”Ssh connection could not be established for :”+ps_host+”, due to :”+ex.getMessage());
}
}

public static class MyUserInfo implements UserInfo
{
public String getPassword(){ return passwd; }

public MyUserInfo(String password)
{
this.passwd = password;
}

public boolean promptYesNo(String str){
System.out.println(str+”promptYesNo”);

return true;
}

String passwd;

public String getPassphrase(){ return null; }
public boolean promptPassphrase(String message){ return true; }
public boolean promptPassword(String message){
//System.out.println(“promptPassword”);
// passwd = message;
//passwd = “turkcell”;

return true;

}
public void showMessage(String message){
System.out.println(message);
}

public String[] promptKeyboardInteractive(String destination,
String name,
String instruction,
String[] prompt,
boolean[] echo){
return new String[3];
}
}
}

Awaker Class :
( important line is start with ssh.openSshConnection… )

package nor;

import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.UserInfo;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
*
* @author TTASUNGUR
*/
public class Awaker {

ExecutorService threadExecutor;
String s_location_file_name = “rtdf_location_ips.rtdf”;
SshHandler ssh = new SshHandler();
String sentinal_files_path;
public Awaker(String sentinal_files_path)
{
this.sentinal_files_path = sentinal_files_path;
}

public void setStreamLocations(String pstr_location_file_name)
{
FileReader l_fr = null;
String ls_temp=”",ls_temp_arr[],ls_local_port;

try
{
File l_f1 = new File(pstr_location_file_name);
l_fr = new FileReader(l_f1);
BufferedReader br = new BufferedReader(l_fr);

try
{
// first line was metada it can be passed
ls_temp = br.readLine();
}
catch (IOException ex)
{
ex.printStackTrace();
}

while ( ls_temp != null)
{
try
{
ls_temp = br.readLine();
System.out.println(Global.info_prefix+”Line was read : “+ls_temp);
ls_temp_arr = ls_temp.split(Global.location_file_splitter);

if ( ls_temp_arr.length != Global.numberof_location_file_fields)
{
System.out.println(Global.error_prefix+”This line has not “+Global.numberof_location_file_fields+” fields”);
}
else
{
try
{
ssh.openSshConnection(ls_temp_arr[1], ls_temp_arr[2], ls_temp_arr[3], ls_temp_arr[4], ls_temp_arr[5], ls_temp_arr[6]);
//StreamHandler sh1 = new StreamHandler(ls_temp_arr[0]+”(“+ls_temp_arr[3]+”)Local Port:”+ls_temp_arr[4],Global.localhost,Integer.parseInt(ls_temp_arr[4]));
//threadExecutor.submit(sh1);

}
catch (SshConnectionException ex)
{
System.out.println(ex.getMessage());
}

}

}
catch (IOException ex)
{
System.out.println(Global.error_prefix+”The line could not be read!”);

}

}
}
catch (FileNotFoundException ex)
{
System.out.println(Global.error_prefix+”Location file are not found : “+s_location_file_name);
ex.printStackTrace();
}
finally
{
try
{
l_fr.close();
}
catch (IOException ex)
{
ex.printStackTrace();
}
}

}

public void startThreads()
{

threadExecutor = Executors.newCachedThreadPool();

FileHandler fh;
fh = new FileHandler(sentinal_files_path);
fh.run();

// stream locations will be set
setStreamLocations(s_location_file_name);

threadExecutor.shutdown(); // nonsense code
if ( threadExecutor.isTerminated() && threadExecutor.isShutdown() )
{
// this message should not be displayed any time.
// because application will not never finish.
System.out.println(“Info : Application has been finished.”);
}
}

}

To download jSch Api, click here.

Pairing Java objects with Oracle Types

When you send your java object variables to Oracle, you can divide objects into attributes, then send them to database. It makes no sense and takes much development time than sending directly as object. Before sending java object to oracle, you should do some steps.

1 ) Firstly, you should set up your java objects.

> Import java.sql.SQLData, SQLException, SQLInput, SQLOutput classes to your object class
> Your class implements SQLData Interface but not necessary
> readSql, writeSql, getSqlTypeName methods must be declared and they process (read or write) class variables

import java.sql.SQLData;
import java.sql.SQLException;
import java.sql.SQLInput;
import java.sql.SQLOutput;

/**
*
* @author TTASUNGUR
*/
public class StreamType implements SQLData
{

private String sql_type;
public String dataType = “”;
public String activation_Type = “”;

public
StreamType()
{
}

public
StreamType(String sql_type)
{
this.sql_type = sql_type;
}

public void readSQL(SQLInput inStream, String typeName) throws
SQLException
{

sql_type = typeName;
dataType = inStream.readString();
activation_Type = inStream.readString();
// this order should be similar to defining order

}

public void writeSQL(SQLOutput outStream) throws SQLException
{
outStream.writeString(dataType);
outStream.writeString(activation_Type);
// this order should be similar to defining order

}

public
String getSQLTypeName() throws SQLException
{
return sql_type;
}
}

2 ) You should create Oracle type that will be matched your java object :

CREATE OR REPLACE TYPE stream_type AS OBJECT
(
dataType VARCHAR2(50),
activation_Type VARCHAR2(50)
);

3 ) There should be mapping java object to oracle object.

java.util.Map myMap = (Map)conn.getTypeMap(); // conn is a Connection object, Map class casts it ( also Map class should be imported your java class )
myMap.put(oracle_gb_file_type, Class.forName(“nor.GB_File”));

4 ) If your new object is ready for sending database, you can pass it to a statement object.

StreamType st = new StreamType();
st.dataType = “foo”;
st.activation_Type = “foo2″;
CallableStatement cstmt = conn.prepareCall(“{ call AQ_ADM_PRTP.AQ_PRTP.ENQUEUE_STREAM_TYPE(?) }”); //plsql procedure that gets StreamType type that has already created in Database
cstmt.setObject(1,st,OracleTypes.STRUCT);

Other link(s):
> Working with Oracle Objects : http://download.oracle.com/docs/cd/B19306_01/java.102/b14355/oraoot.htm

Follow

Get every new post delivered to your Inbox.