A Concurrent Circular Buffer

The following article explains the construction and a use case for a data-structure optimized toward highly concurrent data acquisition. The data-structure outlined is designed very particularly for the use case at hand, which is uncommon enough in the general practice of software engineering not to have a suitable candidate available in common libraries such as the JDK.

Monitoring in a high volume environment

Managing a high volume system can be difficult. Among the crucial tools in the toolbox for doing so is monitoring. Identifying an approaching problem before it actually occurs, can be the difference between success and failure. Creating software monitors are typically fairly simple, and the real task is designing the right type of monitors for your anticipated problem set and subsequently collecting the monitoring data. Recently we were faced with an interesting problem of having a high volume work-flow which was rich in inter-process communication — the type of scenario that's fairly common in a highly distributed system or a service oriented architecture. In order to attempt to predict where and when to add more nodes and of which class, latency needed to be monitored between the intercommunicating systems. There were five individual systems involved in handling the parts of the system expecting traffic, and several points of communication between them. There needed to be a fairly high volume of monitoring points just for handling the most common actions in the system. In order to respond quickly to performance degradation, all of these "trouble areas" need to be monitored independently and in real-time during high-load.

Once this data is captured, it needs to be processed. Therein lies the more complicated problem, while data-processing is important it's also important in today's symmetric multi-processing systems that processing the monitoring data captured does not impact performance. This requirement which is fairly specific to monitoring suggests the use of more traditional concurrency methods such as blocking synchronous queues are poorly suited targets for monitoring data. Under no circumstances should a lagging monitor reduce the service of an application. So before adding monitoring everywhere, I decided to create a concurrent data structure which wouldn't bog down and was appropriate for keeping statistical moving averages — precisely the type of sample most often required for performance monitoring.

The data-structure I settled on was a simple circular buffer, backed by an array. The basis for the decision to use a circular buffer is that application monitors actually don't need every data-point in order to calculate the current rate of throughput or current latency, a fixed sized sample is more than suitable. Additionally, a circular buffer can reduce locking requirements when it's backed by a simple array. Backing the buffer by an array eliminates the chances of concurrent write issues if the size of the buffer is greater in units than the number of possible writers on the system. In combination with this sizing technique,  mutual exclusion locks can be avoided entirely on more modern hardware by using  compare-and-swap methods, data-acquisition to be more or less non-blocking and thus have a negligible impact on over-all system performance.

When using this kind of data-structure, data-sampling is very efficient, since adding a new data-point to the buffer is as simple as increasing a number. Using AtomicInteger to implement the integer both removes concurrency concerns and delegates to the CMPXCHG CPU instruction if the current platform implements it.

public class ConcurrentCircularBuffer <T> {
    private final AtomicInteger cursor = new AtomicInteger();
    private final T[]           buffer;
    private final Class<T>      type;

    public ConcurrentCircularBuffer (final Class <T> type, 
                                     final int bufferSize) 
    {
        if (bufferSize < 1) {
            throw new IllegalArgumentException(
                "Buffer size must be a positive value"
                );
        }

        this.type   = type;
        this.buffer = (T[]) new Object [ bufferSize ];
    }

    public void add (T sample) {
        buffer[ cursor.getAndIncrement() % buffer.length ] = sample;
    }
}

As you can see in the above figure, concurrent data-acquisition is fairly trivial with such a solution. Before this can be used for monitoring purposes, a mechanism for reading the data must be created. The buffer itself can't be read reliably in it's entirety because the content of the buffer is highly unstable. Given that the cursor is never reset and always increments, the instability can be measured. This can be used to create a reliable copying mechanism in which a stable sample of the buffer can be taken. The by sampling the cursor position both before and after a full copy of the buffer, the elements which were modified during the time the copy was made can be eliminated resulting in a variably sized snapshot of the sampling data.

Working with the sample will be most simple if the variability of the sample is abstracted from the consumer of the sample. To simplify processing for the consumer, we'll create two copies of the buffer. The first copy will contain the unstable elements which may have been concurrently modified. The second copy will be the size of the stable area of the buffer, with the first element being the first stable element. The second copy allows the data-structure to hide from the consumer the details of the copy instability, as well as the the circular buffer implementation.

    public T[] snapshot () {
        T[] snapshots = (T[]) new Object [ buffer.length ];
            
        /* Identify the start-position of the buffer. */
        int before = cursor.get();

        /* Terminate early for an empty buffer. */
        if (before == 0) {
            return (T[]) Array.newInstance(type, 0);
        }

        System.arraycopy(buffer, 0, snapshots, 0, buffer.length);

        int after          = cursor.get();
        int size           = buffer.length - (after - before);
        int snapshotCursor = before - 1;

        /* The entire buffer was replaced during the copy. */
        if (size <= 0) {
            return (T[]) Array.newInstance(type, 0);
        }

        int start = snapshotCursor - (size - 1);
        int end   = snapshotCursor;

        if (snapshotCursor < snapshots.length) {
            size   = snapshotCursor + 1;
            start  = 0;
        }

        /* Copy the sample snapshot to a new array the size of our stable
         * snapshot area.
         */
        T[] result = (T[]) Array.newInstance(type, size);

        int startOfCopy = start % snapshots.length;
        int endOfCopy   = end   % snapshots.length;

        /* If the buffer space wraps the physical end of the array, use two
         * copies to construct the new array.
         */
        if (startOfCopy > endOfCopy) {
            System.arraycopy(snapshots, startOfCopy,
                             result, 0, 
                             snapshots.length - startOfCopy);
            System.arraycopy(snapshots, 0,
                             result, (snapshots.length - startOfCopy),
                             endOfCopy + 1);
        }
        else {
            /* Otherwise it's a single continuous segment, copy the whole thing
             * into the result.
             */
            System.arraycopy(snapshots, startOfCopy, result, 0, size);
        }

        return (T[]) result;
    }

This provides a snapshot of the stable area of the buffer. It is not without fail, however. In the instance that the system was extraordinarily busy the entire buffer may have been overwritten during the copy. When this happens, none of the elements can be relied upon as having been updated in any type of a sequence. By testing the size of the safely copied array in a loop a complete snapshot can be guaranteed, ensuring that the highest quality sample available is provided.

    public T[] completeSnapshot () {
         T[] snapshot = snapshot();
         /* Try again until we get a snapshot that's the same size as the
          * buffer...  This is very often a single iteration, but it depends on
          * how busy the system is.
          */
         while (snapshot.length != buffer.length) {
             snapshot = snapshot();
         }
         return snapshot;
     }

Now we can be rest-assured that sampling data for monitoring will not reduce service to the component being monitored.The next step is to create some monitor objects which use this data structure, and utilize them in our system. An example of such a monitor is below. This monitor records the current time of each event seen, and when asked to calculate the rate of throughput, it takes a snapshot of the buffer and calculates the duration of the snapshot.

public class ThroughputMonitor {
    private final ConcurrentCircularBuffer <Long> samples;
    private final TimeUnit timeUnit;

    public ThroughputMonitor (final int sampleSize, final TimeUnit timeUnit) {
        this.samples  = new ConcurrentCircularBuffer <Long> (Long.class,
                                                             sampleSize);
        this.timeUnit = timeUnit;
    }

    public void count () {
        samples.add(System.nanoTime());
    }

    public long getRate (final TimeUnit conversionUnit) {
        if (conversionUnit == null) {
            throw new IllegalArgumentException(
                "A conversion unit is required"
                );
        }

        Long[] snapshot = samples.snapshot();

        if (snapshot.length == 0) {
            return 0L;
        }

        /* Rather than converting down (which reduces resolution, and increases
         * possibility of a duration of zero) the timestamps of the sample,
         * convert up the size of the sample relative to the provided
         * conversion unit and provide a rate based on the average requests per
         * nanosecond.
         */
        long duration = snapshot[snapshot.length - 1] - snapshot[0];

        long statesize = TimeUnit.NANOSECONDS.convert(snapshot.length,
                                                      conversionUnit);

        if (duration > 0) {
            return statesize / duration;
        }
        else {
            /* If we for some reason have a zero-nanosecond sample, just
             * return some ceiling amount (or floored amount).
             */
            return snapshot.length == 1 ? 1 : statesize;
        }
    }

    /**
     * Calculate the current rate of throughput.
     *
     * @return The current rate of throughput per default time-unit.
     */
    public long getRate () {
        return getRate(timeUnit);
    }
}

Attachments