Friday, May 27, 2011

Make an algorithm concurrent with Java thread pool

If you look for the way of making your expensive algorithm faster, you may consider multithreading and concurrency. You can divide your data into the parts and start threads to handle each part concurrently. This looks easy but sometimes there are some additional requisites that you need to obey doing that.

First, is how many threads should you use? This usually depends on some system characteristic. For example there's rather no sense to start 64 threads on the quad-core hosts. This may be reasonable, when you want for example to serve as server socket listener and you want to handle up to 64 requests concurrently. This is sensible way, because we don't assume 100% of CPU utilization for each listening thread - they can wait for data, do some IO, etc. But when you want to parallelize an expensive algorithm, each thread loads usually assigned core maximally.

Thus, to choose appropriate number of threads you need to implement algorithm in the way, that you can change threads count and then test the performance, and look for the best results. According to my tests you'll probably have the best results between CPU_CORES and CPU_CORES*2 values.

Second thing is how to divide data. For example if we have the data of known size and known storage type (like filesystem), we can split it to N parts and start N threads easily. Then wait for each thread and done the job. But this surely depends on the storage type - you know the data size and you can read each part separately. So, it is easy to divide file for eg. 4 parts, start 4 threads and let them read file parts and do the job.

But when you know nothing about the underlying storage type, and you work on plain streams, you simply cannot do that. You can't divide this data without reading full stream, and you obviously shouldn't read whole stream before processing. Firstly, this can significantly decrease performance, because you need to read data twice. Secondly, because you need to store data somewhere (in RAM? in filesystem?) and you quickly can end up without storage space this way.

The solution here is to divide the input for parts of some size and then pass them to the algorithm. The sequential solution would be close to this example:

public void doAlgorithm(byte[] block, int size, OutputStream out) {
}

public void process(InputStream in, OutputStream out) throws IOException {
        int i;
        byte[] buffer = new byte[512*1024];
        while ((i = in.read(buffer))>-1) {
                doAlgorithm(buffer, i, out);
        }
}


The buffer size here is 512 KB, but this should usually depend on the algorithm nature and available memory size.

For such written algorithm the optimization by concurrency can be done using thread pooling. Thread pool stores some number of working threads and allows the user to use the thread from the pool to do the job. Why to use thread pool and not to start a new thread for each data block processing? Look for example on this solution, this is one of the worst possible:

public void process(InputStream in, final OutputStream out) throws IOException {
        int i;
        byte[] buffer = new byte[512*1024];
        while ((i = in.read(buffer))>-1) {
                final int size = i;
                final byte buf[] = buffer.clone();
                new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                                doAlgorithm(buf, size, out);
                        }
                        
                }).start();
        }
}


Why is it so bad? Firstly, because the thread spawning is expensive. It's better to have a ready pool of threads created once, and then reuse the thread, after it finishes previous job. The second advantage is that thread pool should handle and limit of threads count, chosen to best performance results. In this solution, consider what happens when you have very efficient stream read operation, and very expensive algorithm. If data block read takes 0,1s and its processing takes usually 2s, you will probably have about 20 threads run concurrently, because such data amount will be read before first thread finishes processing. Moreover think further and consider what happens if 20 threads run concurrently on your, lets say 4 cores machine. You will probably have a performance boost for 4 concurrent threads, but for 20 there are 5 bunches of 4 concurrently running threads. So the CPU time for each 5 need to be shared. So, we now don't have 2s per algorithm invocation, but closer to 10s. And data still comes with 0,1s speed. For 10s algorithm how many threads we need to serve for such data influx? 100? How long then single algorithm invocation will take for 100 concurrent threads?

This of course won't be a perpetum mobile, and will stabilize somewhere. But this way you'll waste the concurrent optimization benefits surely. Furthermore, having 512KB data buffer for each thread, you have 50MB memory consumption with 100 threads. This doesn't look much but this is just for small initial buffer size. If you had 5MB blocks, and 200 thread started (not really unfeasible looking for algorithm performance), you would have 1GB memory consumption. This would probably crash you VM with default memory settings. Moreover, each thread consumes its 512KB of default stack size, so for 200 threads you have another 100MB of wasted memory.

Lets return now to the right way of thinking. Limiting number of threads. Doing this first time I looked closer at java default thread pooling class, ThreadPoolExecutor. This looked promisingly, but finally turned out unhandy. Java way of pooling work is different than I thought. You can establish a pool size, but when you feed the pool with new requests, it queues them up without locking. If you have 4 threads pool, the 5th invocation will be queued, and executed when a single thread will be free. If you are feeding this pool with a stream data, you rather quickly end up with similar situation to the previous one, because in each 0,1s you will read the data block, allocate the memory and put it into the queue. The advantage now is because the number of threads is limited, but this is not what I was looking for. Especially, that there's no easy way to wait for pool for finishing its job, and to exit from method when everything is done. So, I don't recommend this solution. I think this solution is made for serving such services, as I've written above - for example requests handling.

Much better is SimpleThreadPool from Quartz Schedule library. This is exactly what we need here. The solution is very simple and self-explanatory:

public void process(InputStream in, final OutputStream out) 
throws IOException, SchedulerConfigException {
        int i;
        byte[] buffer = new byte[512*1024];
        
        // spawn 4-threads pool
        SimpleThreadPool pool = new SimpleThreadPool(4, Thread.NORM_PRIORITY);
        pool.setThreadsInheritGroupOfInitializingThread(false);
        pool.initialize();
        
        while ((i = in.read(buffer))>-1) {
                final int size = i;
                final byte buf[] = buffer.clone();
                // run in the pooled thread
                pool.runInThread(new Runnable() {
                        
                        @Override
                        public void run() {
                                doAlgorithm(buf, size, out);
                        }
                        
                });
        }
        
        // wait for pool to finish the job and shutdown it
        pool.shutdown(true);
}


In point of fact I have nothing to add to this code. Just maybe that pool.runInThread() call is locking until a single thread will be available and ready to eat next Runnable.

This is the final, well done solution that I prefer. But sometimes or someone may not want to use quartz for some reasons (like, we don't want to have 101st library integrated; but with quartz it's rather standard to have it - especially in Spring projects). I, for example, have been writing lately command line tests for some algorithms, outside the container, and also didn't want to play with  libs. This is the resulting pure Java implementation of discussed issue, using ThreadGroup:

public void process(InputStream in, final OutputStream out) 
throws IOException, SchedulerConfigException, InterruptedException {
        int i;
        byte[] buffer = new byte[512*1024];

        // initialize our "pool"
        int threads = 4;
        ThreadGroup group = new ThreadGroup("mypool");
        
        while ((i = in.read(buffer))>-1) {
                final int size = i;
                final byte buf[] = buffer.clone();
                
                // wait until there will be free "thread slot" in a "pool"
                synchronized (group) {
                        while (group.activeCount()>=threads) 
                                group.wait();
                }
                
                // run in the new thread in a group
                new Thread(group, new Runnable() {
                        
                        @Override
                        public void run() {
                                doAlgorithm(buf, size, out);
                        }
                        
                }).start();
        }
        
        // wait for pool finish the job and shutdown it
        synchronized (group) {
                while (group.activeCount()>0) 
                        group.wait();
        }
}


This is not really a pool implementation, because it spawns the new thread each time, but I'm presenting this, because it's very easy to achieve in Java. Other "pure" solutions would need to use own pool implementation, or playing with ThreadPoolExecutor.

Wednesday, May 25, 2011

Threaded IO streams in Java

Solving some interesting problems in the project I'm currently working on I started to analyse following problem: you have a large data set (eg. a file) on which you need to apply some expensive transformation (eg. compression). The key requirement is a performance.

The best performance demands can usually be effectively fulfilled using multi-threaded algorithms, especially on multiple CPU cores hardware. But you may not to have a multi-threaded transformation algorithm, or you can get a requirement to have many algorithms pluggable, not always designed for parallel work.

I started to think that it's easy to divide source data for chunks and start a thread pool to do the job. This is feasible when you can have multiple output streams, but if you want to write everything to a single file may be a little problematic. If you synchronize all threads to write to a single stream, you rather end up with multi-threaded algorithm working as well as simple single-threaded one. You will simply have no benefits from concurrency, when all threads need to wait for currently writing thread, having a lock. They simply don't do the transformation in the time they wait for the lock. In the result - only one thread works effectively.

If you want to have benefits from concurrency you need to design it in the way, that all threads can do their expensive work independently from the others, without waiting and synchronization.

The conception is following. We have N working threads, and each one is working its way and has been started in different time, in proportion to the others. If we had N buffers with previously established size for each thread, we could assert concurrent writing to these buffers from independent threads without any collision. If the buffer is overfilled, we can flush it to the output stream. If the lock is gained only for flushing, we potentially have other threads working in this time on their buffers, without waiting to have resource available. A Java implementation of this issue, conformed to input/output streams contract in Java is simple (I use apache commons to shorten time required to perform obvious things):

import java.io.BufferedOutputStream;
import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; /** * Wraps target output stream and allows to write to it from multiple threads, handling * underlying data. The data in target output stream is interleaved. It can be then * read then by ThreadedInputStream. * * The stream uses internal buffering and synchronization. The default buffer size is 512KB. * This means, than for each thread there'll be allocated 512KB buffer, when thread starts * to write into the stream. The buffer is flushed into the target output stream periodically. * * Maximum number of threads: Byte.MAX_VALUE * * The lifecycle is following: * <ol> * <li>Create new stream in a main thread * <li>For each worker thread: * <ul> * <li>use the stream in general way * <li>on the end of processing you need to call close(); the stream will not be really closed, * but will be flushed and thread buffers will be removed * </ul> * <li>Call close() in main thread to close stream permanently (as well as the target stream) * </ol> */ public class ThreadedOutputStream extends OutputStream { protected DataOutputStream target; protected int bufSize = 512*1024; // default buffer size = 512 KB protected volatile byte threadsCount = 0; protected Thread creatorThread; /** Internal thread data holder and buffer **/ protected class ThreadStreamHolder { byte index = 0; int size = 0; byte[] buffer = new byte[bufSize]; public ThreadStreamHolder(byte index) { super(); this.index = index; } /** Flush data to the target stream **/ public void flush() throws IOException { if (size>0) { synchronized (target) { target.writeByte(index); // write thread index target.writeInt(size); // write block size target.write(buffer, 0, size); // write data size = 0; } } } public void write(int b) throws IOException { buffer[size++] = (byte) b; if (size>=bufSize) flush(); } } protected ThreadLocal<ThreadStreamHolder> threads = new ThreadLocal<ThreadedOutputStream.ThreadStreamHolder>(); /** * Creates stream using default buffer size (512 KB). * @param target Target output stream where data will be really written. */ public ThreadedOutputStream(OutputStream target) { super(); this.target = new DataOutputStream(target); creatorThread = Thread.currentThread(); } /** * Creates stream using custom buffer size value. * @param target Target output stream where data will be really written. * @param bufSize Buffer size in bytes. */ public ThreadedOutputStream(OutputStream target, int bufSize) { this(target); this.bufSize = bufSize; } @Override public void write(int b) throws IOException { ThreadStreamHolder sh = threads.get(); if (sh==null) { synchronized (this) { // to avoid more threads with the same threads count if (threadsCount==Byte.MAX_VALUE) throw new IOException("Cannot serve for more than Byte.MAX_VALUE threads"); sh = new ThreadStreamHolder(threadsCount++); // passing threadsCount and ++ is not atomic ! threads.set(sh); } } sh.write(b); } @Override public void flush() throws IOException { super.flush(); // flush the buffers on the end ThreadStreamHolder sh = threads.get(); if (sh!=null) sh.flush(); } @Override public void close() throws IOException { flush(); threads.remove(); // in main thread, close stream if (Thread.currentThread().equals(creatorThread)) target.close(); } public static final int TEST_THREADS = 64; // number of threads public static final double TEST_DPT_MAX = 1024*1024*10; // data amount per thread public static final int TEST_BLOCKSIZE = 1024*512; // default block size public static void main(String[] args) throws IOException { File f = new File("threados"); OutputStream target = new BufferedOutputStream(new FileOutputStream(f, false)); final ThreadedOutputStream out = new ThreadedOutputStream(target, TEST_BLOCKSIZE); ThreadGroup group = new ThreadGroup("threados"); // write some data by threads for (int i=0; i<TEST_THREADS; i++) { final int valueToWrite = (i+5)*20; new Thread(group, new Runnable() { @Override public void run() { try { int jMax = (int) (Math.random()*TEST_DPT_MAX) + 1; byte crc = 0; for (int j=0; j<jMax; j++) { out.write(valueToWrite+j); crc+=(valueToWrite+j); } out.write(crc); System.out.println("Some thread count: "+(jMax+1)); out.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } // wait for thread group to finish try { synchronized (group) { if (group.activeCount()>0) group.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } out.close(); } }
The static main() method shows an exemplary usage of the stream from multiple threads. You can manipulate constants to check how does it perform.

If we have such multi-threaded stream and the transformation algorithm performance depends on the input data, the buffers are filled in very random way and we have real benefits from multi-threading. I tested this and it performs very well.

But the output file made this way consists of random blocks of interleaved stream data, written to a single file. So how to handle with restoring data from source file (to apply reverse transformation)? This is why we put into the target stream the thread index and block size value before each block. We can now restore this sequentially, reading each thread source stream separately, or we can do this concurrently, using a thread pool with same threads count as in writing mode. All information can be easily retrieved from interleaved stream:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;

/**
 * Allows to read data previously generated by ThreadedOutputStream. This class instance
 * works in single thread and retrieves data from interleaved stream, written by single 
 * thread. The raw source stream is divided on entries, depending on how many threads
 * were writing to the ThreadedOutputStream.
 */
public class ThreadedInputStream extends InputStream {
        
  protected DataInputStream source;
  protected byte entry;
        
  protected int bytesToRead = 0; // how many bytes can we read already from current data block?
        
  protected long pos = 0;
        
  /**
  * Creates the input stream.
  * 
  * @param source Newly instantiated, clean source input stream to raw data
  *      produced by ThreadedOutputStream.
  * @param entry Entry number. There can be many entries in source stream (0, 1, 2... etc,
  *      depending on count of writing threads).
  * @throws EOFException If there's no such entry yet (you need to manually close source then).
  */
  public ThreadedInputStream(InputStream source, byte entry) throws IOException {
    super();

    if (entry>Byte.MAX_VALUE)
      throw new IOException("Cannot serve for more than Byte.MAX_VALUE threads");
                
    this.source = new DataInputStream(source);
    this.entry = entry;
    lookupNextBlock();
  }
        
  protected void lookupNextBlock() throws IOException {
    while (true) {
      byte currentEntry = source.readByte();
                        
      if (currentEntry==entry) {
        // found next entry datablock
        bytesToRead = source.readInt();
        break;
      } else {
        // found next entry, but for different datablock (look for another)
        int blockSize = source.readInt();
        long toSkip = blockSize;
        while (toSkip>0) {
          long skip = source.skip(toSkip);
          if (skip<0)
            throw new EOFException("Cannot skip full datablock");
          toSkip -= skip;
        }                     
      }
    }
  }

  @Override
  public int read() throws IOException {
    if (bytesToRead<=0)
      try {
        lookupNextBlock();
      } catch (EOFException e) {
        return -1;
      }
                
    bytesToRead--;
    return source.read();
  }

  @Override
  public void close() throws IOException {
    source.close();
  }

  // test
  public static void main(String[] args) throws IOException {
    File f = new File("threados");
                
    ThreadGroup group = new ThreadGroup("threados");
                
    // read some data by threads
    Map<Byte, ByteArrayOutputStream> outmap = new LinkedHashMap<Byte, ByteArrayOutputStream>();
                
    try {
      byte i = 0;
      while (true) {
        InputStream source = new BufferedInputStream(new FileInputStream(f));
        try {
          final ThreadedInputStream is = new ThreadedInputStream(source, i++);
          final ByteArrayOutputStream out = new ByteArrayOutputStream();
          outmap.put(i, out);
                                        
          new Thread(group, new Runnable() {
            @Override
            public void run() {
              try {
                IOUtils.copy(is, out);
                is.close();
              } catch (IOException e) {
                e.printStackTrace();
              }
            }
          }).start();
        } catch (EOFException e) {
          source.close();
          break;
        }
      }
    } catch (EOFException e) {} // no more interleaved streams
                
    // wait for threads
    try {
      synchronized (group) {
        if (group.activeCount()>0)
        group.wait();
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
                
    for (byte b: outmap.keySet()) {
      byte[] ba = outmap.get(b).toByteArray();
      System.out.println(b+" ["+ba.length+"]: "+dumpByteArray(ba));
    }
  }
        
  public static String dumpByteArray(byte[] b) {
    StringBuffer sb = new StringBuffer();
    int i = 0;
    byte crc = 0;
    for (byte b1: b) {
      if (i++<20) {
        if (b1<10 && b1>=0)
          sb.append(0);
        sb.append((int) b1 & 0xFF);
        sb.append(',');
      }
                        
      if (i==b.length) {
        if (crc==b1)
          sb.append("crc ok");
        else
          sb.append("crc error");
      } else
          crc+=b1;
    }
    return sb.toString();
  }

}
Also this time main() method should get the file from output stream test, read it in thread pool, and perform simple checksum check.

Depending on requirements, ThreadedInputStream class can be used as a source for multi-threaded or sequential algorithm as well. You can start N threads to get N-stream to fetch data from it, or you can freely iterate through stream data entries in a single thread (in this case you need to perform N iteration).

I'm presenting this solution as a curiosity, but also as a real, working algorithm I used in the project. It works well for solving problems, when you want to parallelize sequential transformation algorithm (in my case it was compression) on a potentially large input data.  Of course there's another question how to chunk source data, but this will depend on the source data itself. For unstructured file you can for example divide it to equal parts, and start N reading streams with different starting read index.