Sunday, July 18, 2010

A simple tutorial about coding multi thread code with Java cocurrency API.

I was asked to help on writing a multi-thread code recently. I think it is piece of cake as I wrote multi-thread code before. I do not want to write a thread pool from the scratch. So, obviously, the first place I will go is apache common library. However, I saw the below description on its web site,
Please note that one of the reasons this component is still in the sandbox and has never had a 1.0 release is Doug Lea's excellent util.concurrent library. That library works with many JDKs, is full-featured, well-documented, and well-tested. That library is also a standard part of the JDK starting with JDK 1.5, as the java.util.concurrent library: it is specified in the JSR 166 standard. 
So, common thread pool is dead although it will not say that in this way? I think I had been dead too as I, as a Java programmer since 1996, do not know how to use thread pool from JDK1.5. So, to save myself, I learned JDK thread pool and make a simple tutorial here.

In this simple demo project, I make five classes, ThreadpoolDemo is the main class. DummyTaskExcutor is worker thread. And TaskFinishedEvent and TaskFinishedEventListener are helper classes to inform ThreadpoolDemo that one DummyTaskExcutor object finished the task running already. Below is the source code.

TaskFinishedEvent.java
package jia.blog;

import java.util.EventObject;

/**
 * A very simple event class, which is used to notify main class that
 * this task has been finished.
 * 
 * @author yiyu.jia
 */
public class TaskFinishedEvent extends EventObject{

    public TaskFinishedEvent(Object source) {
        super(source);
    }
}

TaskFinishedEventListener.java
package jia.blog;

import java.util.EventListener;
/**
 * This is a listener interface definition.
 * @author yiyu.jia
 */
public interface TaskFinishedEventListener extends EventListener{

    public void taskFinishedEventOccured(TaskFinishedEvent evt);

}

ThreadpoolDemo.java
package jia.blog;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author Yiyu Jia
 */
public class ThreadpoolDemo implements TaskFinishedEventListener{

    //total number of core thread
    private int coreThreadNum = 5;
    //a counter for finished tasks (thread).
    private static int finishedTasks = 0;
    //a static object for synchronizing block of code.
    private static final Object syncObj = new Object();
    //This is the thread pool object.
    private static ThreadPoolExecutor exec;


    public void runTasks(){

        //Creating a blocking queue for holding runnalbe tasks with
        //maximum capacity of holding coreThreadNum*2. We know we will only
        //have 5 dummy thread task in this simple demo. I assign it as coreThreadNum*2
        // to highlight the difference only.
        BlockingQueue q = new ArrayBlockingQueue(coreThreadNum*2);

        //Creating a thread pool which will always have coreThreadNum live thread and
        //coreThreadNum*2 thread in maximum. If a thread is idle for longer than 1 second and
        //the total number of thread in pool is larger than coreThreadNum, the thread will be killed.
        exec = new ThreadPoolExecutor(coreThreadNum, coreThreadNum*2, 1, TimeUnit.SECONDS, q);

        //Now, let's feed the thread pool runnable tasks and excute them all.
        //One I want to highlight here is that we do not need to setup a endless loop
        //in the main class to keep whole application not to exit now as we are using
        //ThreadPoolExecutor.
        for (int i = 0; i < coreThreadNum; i++) {
            DummyTaskExcutor tempExt = new DummyTaskExcutor();
            tempExt.addTaskFinishedEventListener(this);
            exec.execute(tempExt);
        }

        //lets start a monitor thread and have fun.
        //We can expan the capability of this thread.
        MonitorThread monitor = new MonitorThread(this);
        monitor.setDaemon(true); //set monitor thread as a Daemon thread.
        monitor.start();
    }

    /**
     * supply running time information.
     * @return String
     */
    public String getInfo(){
        String info = "There are currently " + exec.getPoolSize() + " in pool and "
                + exec.getCompletedTaskCount() + " has been finished.";
        return info;
    }

    public void taskFinishedEventOccured(TaskFinishedEvent evt) {
        // I prefer to use this counter to determine whether all
        // task is completed or not.
        synchronized (syncObj) { 
            finishedTasks++;
        }
        if (finishedTasks >= coreThreadNum) {
            exec.shutdownNow();//I just shut it down immediately.
        }
    }

    static public void main(String argv[]) {

        ThreadpoolDemo demo = new ThreadpoolDemo();
        try {
            demo.runTasks();
        } catch (Exception e) {
        }
    }
}


DummyTaskExcutor.java
package jia.blog;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A dummy thread.
 * 
 * @author Yiyu Jia
 */
public class DummyTaskExcutor implements Runnable{

    public void run() {
        try {
            for (int i = 0; i < 3; i++) {
                
                System.out.println("I am lazy object No." + this.hashCode()); //use object hashcode to identify each thread object.
                Thread.sleep(doRawRandomNumber()); //sleep in random time between 0.5 second and 10 second.
            }
        } catch (InterruptedException ex) {
            Logger.getLogger(DummyTaskExcutor.class.getName()).log(Level.SEVERE, null, ex);
        }
        TaskFinishedEvent evt = new TaskFinishedEvent(this); //I will replace this with other thread info later.
        fireTaskFinishedEvent(evt);

    }

    /**
     * a function generating random number in given range.
     * @return int
     */
    private int doRawRandomNumber() {

        int min = 500;
        int max = 10000;
        return (int) (Math.random() * (max - min + 1)) + min;
    }


        // Create the listener list
    protected javax.swing.event.EventListenerList listenerList = new javax.swing.event.EventListenerList();

    // This methods allows classes to register for TaskFinishedEvents
    public void addTaskFinishedEventListener(TaskFinishedEventListener listener) {
        listenerList.add(TaskFinishedEventListener.class, listener);
    }

    // This methods allows classes to unregister forTaskFinishedEvents
    public void removeTaskFinishedEventListener(TaskFinishedEventListener listener) {
        listenerList.remove(TaskFinishedEventListener.class, listener);
    }

    // This private class is used to fire TaskFinishedEvents
    void fireTaskFinishedEvent(TaskFinishedEvent evt) {
        Object[] listeners = listenerList.getListenerList();
        // Each listener occupies two elements - the first is the listener class
        // and the second is the listener instance
        for (int i = 0; i < listeners.length; i += 2) {
            if (listeners[i] == TaskFinishedEventListener.class) {
                ((TaskFinishedEventListener) listeners[i + 1]).taskFinishedEventOccured(evt);
            }
        }
    }
}

MonitorThread.java
package jia.blog;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This class works as a daemon thread to get running time info from main class.
 * @author yiyu
 */
public class MonitorThread extends Thread{
    
    ThreadpoolDemo demo; //a pointer to main class object.

    public MonitorThread(ThreadpoolDemo demo) {
        this.demo = demo;
    }

    @Override
    public void run() {
        do{
            try {
                
                Thread.sleep(2000);
                printInfo(demo.getInfo()); //get running time info from main class.

            } catch (InterruptedException ex) {
                Logger.getLogger(MonitorThread.class.getName()).log(Level.SEVERE, null, ex);
            }
        }while(true);
    }
    
    private void printInfo(String info){
        System.out.println(info);
    }
} 



Click here to download the netbeans project.