Pure Danger Tech


navigation
home

Thread coordination with CountDownLatch and CyclicBarrier

11 Nov 2007

Java 5 introduced many new concurrency primitives and collections, and this post is going to look at two classes that can be used to coordinate threads: CountDownLatch and CyclicBarrier.

A CountDownLatch is initialized with a counter. Threads can then either count down on the latch or wait for it to reach 0. When the latch reaches 0, all waiting threads are released.

A common idiom is to use a latch to trigger a coordinated start or end between threads:

package puredanger.coord;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CountDownDemo {

public static void main(String[] args) throws Exception {

int threads = 3;

final CountDownLatch startLatch = new CountDownLatch(threads);

final CountDownLatch endLatch = new CountDownLatch(threads);

ExecutorService svc = Executors.newFixedThreadPool(threads);

for (int i = 0; i < threads; i++) {

svc.execute(new Runnable() {

public void run() {

try {

log(“At run()”);

startLatch.countDown();

startLatch.await();

log(“Do work”);

Thread.sleep((int) (Math.random() * 1000));

log(“Wait for end”);

endLatch.countDown();

endLatch.await();

log(“Done”);

} catch (Exception e) {

e.printStackTrace();

}

}

});

Thread.sleep(100);

}

}

private static void log(String msg) {

System.out.println(System.currentTimeMillis() + “: “

  • Thread.currentThread().getId() + “ “ + msg);

}

}

In this code, you’ll see two latches get initialized. Each thread that starts up counts down on the latch and awaits the latch counting down to 0 (when all threads have been initialized). Similarly, each thread waits for all threads to complete at the same time.

Running this program yields:

1194812267416: 7 At run()

1194812267517: 8 At run()

1194812267618: 9 At run()

1194812267618: 9 Do work

1194812267618: 7 Do work

1194812267619: 8 Do work

1194812267673: 7 Wait for end

1194812267688: 8 Wait for end

1194812268023: 9 Wait for end

1194812268023: 9 Done

1194812268023: 7 Done

1194812268023: 8 Done

You can see that each thread hits run() at different times, but proceeds past the barrier at the same time. They each then do some random amount of work and wait for the latch, then proceed past it together.

In the example above, each thread waits forever for the latch to trigger. You can also choose to wait for a specified time period before giving up. And you can check the latch to see how many threads have arrived and are now waiting. Each CountDownLatch instance can only be used once and is then dead.

If you want a set of threads to repeatedly meet at a common point, you are better served by using a CyclicBarrier. A common use for this is in multi-threaded testing where it is typical to start a bunch of threads, meet, do some stuff, meet, validate some assertions, repeatedly.

The prior program can be simplified by replacing the two latches with a single barrier:

package puredanger.coord;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CyclicBarrierDemo {

public static void main(String[] args) throws Exception {

int threads = 3;

final CyclicBarrier barrier = new CyclicBarrier(threads);

ExecutorService svc = Executors.newFixedThreadPool(threads);

for (int i = 0; i < threads; i++) {

svc.execute(new Runnable() {

public void run() {

try {

log(“At run()”);

barrier.await();

log(“Do work”);

Thread.sleep((int) (Math.random() * 1000));

log(“Wait for end”);

barrier.await();

log(“Done”);

} catch (Exception e) {

e.printStackTrace();

}

}

});

Thread.sleep(100);

}

}

private static void log(String msg) {

System.out.println(System.currentTimeMillis() + “: “

  • Thread.currentThread().getId() + “ “ + msg);

}

}

We can see here that the threads can repeatedly wait at the barrier, which implicitly counts down until all threads have arrived, then releases all threads.

Another nice trick with CyclicBarrier is that a Runnable action can be associated with the barrier to be run by the last thread reaching the barrier. You can very simply build a start/end timer for testing with this functionality:

package puredanger.coord;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class TimerBarrierDemo {

public static void main(String[] args) throws Exception {

int threads = 3;

final CyclicBarrier barrier = new CyclicBarrier(threads, new BarrierTimer());

ExecutorService svc = Executors.newFixedThreadPool(threads);

for (int i = 0; i < threads; i++) {

svc.execute(new Runnable() {

public void run() {

try {

barrier.await();

long sleepTime = (int) (Math.random() * 1000);

System.out.println(Thread.currentThread().getId() + “ working for “ + sleepTime);

Thread.sleep(sleepTime);

barrier.await();

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

private static class BarrierTimer implements Runnable {

private long start;

public void run() {

if (start == 0) {

start = System.currentTimeMillis();

} else {

long end = System.currentTimeMillis();

long elapsed = (end – start);

System.out.println(“Completed in “ + elapsed + “ ms”);

}

}

}

}

Here we rely on knowing that the barrier will be reached exactly twice – once at start and once at end. The first time it’s reached, we record a timestamp and the second time it’s reached we print out the timing. When we construct our barrier, we give it an instance of this timer class. Each thread then waits to start on the barrier, works for a random amount of time, and waits for the end barrier.

A run looks like this:

9 working for 35

7 working for 341

8 working for 371

Completed in 372 ms

Generally, you should expect the recorded elapsed time to be the maximum of the working time of any of the threads.

CyclicBarrier has a few additional tricks as well – threads can wait for a time period instead of forever, check whether a barrier has been broken (by interruption or forcibly with a reset() method), and determine the number of parties and the number currently waiting.

This is the first of a series of occasional posts on the concurrency classes added in Java 5 on. Next in the series will be a little program I call “The Hungry Philosophers”.