Friday, May 27, 2011

Make an algorithm concurrent with Java thread pool

If you look for the way of making your expensive algorithm faster, you may consider multithreading and concurrency. You can divide your data into the parts and start threads to handle each part concurrently. This looks easy but sometimes there are some additional requisites that you need to obey doing that.

First, is how many threads should you use? This usually depends on some system characteristic. For example there's rather no sense to start 64 threads on the quad-core hosts. This may be reasonable, when you want for example to serve as server socket listener and you want to handle up to 64 requests concurrently. This is sensible way, because we don't assume 100% of CPU utilization for each listening thread - they can wait for data, do some IO, etc. But when you want to parallelize an expensive algorithm, each thread loads usually assigned core maximally.

Thus, to choose appropriate number of threads you need to implement algorithm in the way, that you can change threads count and then test the performance, and look for the best results. According to my tests you'll probably have the best results between CPU_CORES and CPU_CORES*2 values.

Second thing is how to divide data. For example if we have the data of known size and known storage type (like filesystem), we can split it to N parts and start N threads easily. Then wait for each thread and done the job. But this surely depends on the storage type - you know the data size and you can read each part separately. So, it is easy to divide file for eg. 4 parts, start 4 threads and let them read file parts and do the job.

But when you know nothing about the underlying storage type, and you work on plain streams, you simply cannot do that. You can't divide this data without reading full stream, and you obviously shouldn't read whole stream before processing. Firstly, this can significantly decrease performance, because you need to read data twice. Secondly, because you need to store data somewhere (in RAM? in filesystem?) and you quickly can end up without storage space this way.

The solution here is to divide the input for parts of some size and then pass them to the algorithm. The sequential solution would be close to this example:

public void doAlgorithm(byte[] block, int size, OutputStream out) {
}

public void process(InputStream in, OutputStream out) throws IOException {
        int i;
        byte[] buffer = new byte[512*1024];
        while ((i = in.read(buffer))>-1) {
                doAlgorithm(buffer, i, out);
        }
}


The buffer size here is 512 KB, but this should usually depend on the algorithm nature and available memory size.

For such written algorithm the optimization by concurrency can be done using thread pooling. Thread pool stores some number of working threads and allows the user to use the thread from the pool to do the job. Why to use thread pool and not to start a new thread for each data block processing? Look for example on this solution, this is one of the worst possible:

public void process(InputStream in, final OutputStream out) throws IOException {
        int i;
        byte[] buffer = new byte[512*1024];
        while ((i = in.read(buffer))>-1) {
                final int size = i;
                final byte buf[] = buffer.clone();
                new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                                doAlgorithm(buf, size, out);
                        }
                        
                }).start();
        }
}


Why is it so bad? Firstly, because the thread spawning is expensive. It's better to have a ready pool of threads created once, and then reuse the thread, after it finishes previous job. The second advantage is that thread pool should handle and limit of threads count, chosen to best performance results. In this solution, consider what happens when you have very efficient stream read operation, and very expensive algorithm. If data block read takes 0,1s and its processing takes usually 2s, you will probably have about 20 threads run concurrently, because such data amount will be read before first thread finishes processing. Moreover think further and consider what happens if 20 threads run concurrently on your, lets say 4 cores machine. You will probably have a performance boost for 4 concurrent threads, but for 20 there are 5 bunches of 4 concurrently running threads. So the CPU time for each 5 need to be shared. So, we now don't have 2s per algorithm invocation, but closer to 10s. And data still comes with 0,1s speed. For 10s algorithm how many threads we need to serve for such data influx? 100? How long then single algorithm invocation will take for 100 concurrent threads?

This of course won't be a perpetum mobile, and will stabilize somewhere. But this way you'll waste the concurrent optimization benefits surely. Furthermore, having 512KB data buffer for each thread, you have 50MB memory consumption with 100 threads. This doesn't look much but this is just for small initial buffer size. If you had 5MB blocks, and 200 thread started (not really unfeasible looking for algorithm performance), you would have 1GB memory consumption. This would probably crash you VM with default memory settings. Moreover, each thread consumes its 512KB of default stack size, so for 200 threads you have another 100MB of wasted memory.

Lets return now to the right way of thinking. Limiting number of threads. Doing this first time I looked closer at java default thread pooling class, ThreadPoolExecutor. This looked promisingly, but finally turned out unhandy. Java way of pooling work is different than I thought. You can establish a pool size, but when you feed the pool with new requests, it queues them up without locking. If you have 4 threads pool, the 5th invocation will be queued, and executed when a single thread will be free. If you are feeding this pool with a stream data, you rather quickly end up with similar situation to the previous one, because in each 0,1s you will read the data block, allocate the memory and put it into the queue. The advantage now is because the number of threads is limited, but this is not what I was looking for. Especially, that there's no easy way to wait for pool for finishing its job, and to exit from method when everything is done. So, I don't recommend this solution. I think this solution is made for serving such services, as I've written above - for example requests handling.

Much better is SimpleThreadPool from Quartz Schedule library. This is exactly what we need here. The solution is very simple and self-explanatory:

public void process(InputStream in, final OutputStream out) 
throws IOException, SchedulerConfigException {
        int i;
        byte[] buffer = new byte[512*1024];
        
        // spawn 4-threads pool
        SimpleThreadPool pool = new SimpleThreadPool(4, Thread.NORM_PRIORITY);
        pool.setThreadsInheritGroupOfInitializingThread(false);
        pool.initialize();
        
        while ((i = in.read(buffer))>-1) {
                final int size = i;
                final byte buf[] = buffer.clone();
                // run in the pooled thread
                pool.runInThread(new Runnable() {
                        
                        @Override
                        public void run() {
                                doAlgorithm(buf, size, out);
                        }
                        
                });
        }
        
        // wait for pool to finish the job and shutdown it
        pool.shutdown(true);
}


In point of fact I have nothing to add to this code. Just maybe that pool.runInThread() call is locking until a single thread will be available and ready to eat next Runnable.

This is the final, well done solution that I prefer. But sometimes or someone may not want to use quartz for some reasons (like, we don't want to have 101st library integrated; but with quartz it's rather standard to have it - especially in Spring projects). I, for example, have been writing lately command line tests for some algorithms, outside the container, and also didn't want to play with  libs. This is the resulting pure Java implementation of discussed issue, using ThreadGroup:

public void process(InputStream in, final OutputStream out) 
throws IOException, SchedulerConfigException, InterruptedException {
        int i;
        byte[] buffer = new byte[512*1024];

        // initialize our "pool"
        int threads = 4;
        ThreadGroup group = new ThreadGroup("mypool");
        
        while ((i = in.read(buffer))>-1) {
                final int size = i;
                final byte buf[] = buffer.clone();
                
                // wait until there will be free "thread slot" in a "pool"
                synchronized (group) {
                        while (group.activeCount()>=threads) 
                                group.wait();
                }
                
                // run in the new thread in a group
                new Thread(group, new Runnable() {
                        
                        @Override
                        public void run() {
                                doAlgorithm(buf, size, out);
                        }
                        
                }).start();
        }
        
        // wait for pool finish the job and shutdown it
        synchronized (group) {
                while (group.activeCount()>0) 
                        group.wait();
        }
}


This is not really a pool implementation, because it spawns the new thread each time, but I'm presenting this, because it's very easy to achieve in Java. Other "pure" solutions would need to use own pool implementation, or playing with ThreadPoolExecutor.

No comments:

Post a Comment