Guía Completa sobre la Clase CountDownLatch en Java
1. Introducción
En este artículo, proporcionaremos una guía sobre la clase CountDownLatch y demostraremos cómo se puede utilizar en algunos ejemplos prácticos. Esencialmente, al usar un CountDownLatch
, podemos hacer que un hilo se bloquee hasta que otros hilos hayan completado una tarea determinada.
2. Uso en Programación Concurrente
Simplificando, un CountDownLatch
tiene un campo de contador, que puede disminuir conforme lo requiramos. Podemos usarlo para bloquear un hilo llamador hasta que haya llegado a cero. Por ejemplo, si estamos realizando un procesamiento en paralelo, podemos instanciar el CountDownLatch
con el mismo valor para el contador que el número de hilos en los que queremos trabajar. Luego, solo necesitamos llamar a countdown()
después de que cada hilo termine, garantizando que un hilo dependiente que llama a await()
se bloquee hasta que los hilos trabajadores hayan terminado.
Ejemplo de Código:
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Worker implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;
public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
doSomeWork(); // Simulación de trabajo
outputScraper.add("Counted down");
countDownLatch.countDown();
}
private void doSomeWork() {
// Simulación de trabajo a realizar
try {
Thread.sleep(100); // Simula un retardo en el trabajo
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
3. Esperando a Que Un Conjunto de Hilos Termine
Probemos este patrón creando un Worker
y utilizando un CountDownLatch
para señalar cuando ha completado su tarea. A continuación se presenta un caso de prueba que demuestra cómo un CountDownLatch
puede hacer que el hilo principal se bloquee hasta que los hilos Worker
terminen.
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
public class WorkerTest {
@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
.limit(5)
.toList();
workers.forEach(Thread::start);
countDownLatch.await(); // Espera hasta que countDownLatch sea 0
outputScraper.add("Latch released");
assertThat(outputScraper)
.containsExactly(
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Latch released"
);
}
}
Naturalmente, "Latch released" siempre será la última salida, ya que depende de la liberación del CountDownLatch
. Nota que si no llamamos a await()
, no podemos garantizar el orden de ejecución de los hilos, por lo que la prueba podría fallar de manera aleatoria.
4. Un Conjunto de Hilos Esperando Para Comenzar
Si tomamos el ejemplo anterior, pero esta vez empezamos miles de hilos en lugar de cinco, es probable que muchos de los hilos anteriores hayan terminado su procesamiento antes de que incluso hayamos llamado a start()
en los posteriores. Esto podría dificultar la reproducción de un problema de concurrencia, ya que no podríamos hacer que todos nuestros hilos se ejecuten en paralelo.
Para eludir esto, hagamos que el CountDownLatch
funcione de manera diferente. En lugar de bloquear un hilo principal hasta que algunos hilos secundarios hayan terminado, podemos bloquear cada hilo secundario hasta que todos los demás hayan comenzado.
Ejemplo de Código:
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class WaitingWorker implements Runnable {
private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;
public WaitingWorker(List<String> outputScraper, CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) {
this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}
@Override
public void run() {
readyThreadCounter.countDown(); // Indica que el hilo está listo
try {
callingThreadBlocker.await(); // Bloquea hasta que sea liberado
doSomeWork();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
completedThreadCounter.countDown();
}
}
private void doSomeWork() {
// Simulación de trabajo
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Y así, el test sería modificado para asegurarse de que todos los Workers
se han iniciado antes de que comiencen a procesar:
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
public class WaitingWorkerTest {
@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new WaitingWorker(outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.toList();
workers.forEach(Thread::start);
readyThreadCounter.await(); // Espera hasta que todos los hilos estén listos
outputScraper.add("Workers ready");
callingThreadBlocker.countDown(); // Libera a los hilos para comenzar
completedThreadCounter.await(); // Espera a que todos terminen
outputScraper.add("Workers complete");
assertThat(outputScraper)
.containsExactly(
"Workers ready",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Workers complete"
);
}
}
Este patrón es realmente útil para intentar reproducir errores de concurrencia, ya que se puede utilizar para forzar que miles de hilos intenten realizar alguna lógica en paralelo.
5. Terminando un CountDownLatch Prematuramente
A veces, podemos encontrarnos en una situación en la que los Workers
terminan con un error antes de contar hacia abajo el CountDownLatch
. Esto podría resultar en que nunca alcance cero y await()
nunca termine.
Ejemplo de Código:
Un ejemplo de cómo podría fallar un Worker
es el siguiente:
public class BrokenWorker implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;
public BrokenWorker(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
if (true) {
throw new RuntimeException("Oh dear, I'm a BrokenWorker");
}
countDownLatch.countDown();
outputScraper.add("Counted down");
}
}
Ahora, podemos modificar nuestra prueba para usar un BrokenWorker
, mostrando cómo await()
se bloqueará indefinidamente:
@Test
public void whenFailingToParallelProcess_thenMainThreadShouldNotGetStuck() throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
.limit(5)
.toList();
workers.forEach(Thread::start);
countDownLatch.await(); // Esto podría bloquear indefinidamente
}
Claramente, este no es el comportamiento que queremos: sería mejor que la aplicación continuara que bloquearse infinitamente. Para evitar esto, añadimos un argumento de tiempo de espera a nuestra llamada a await()
.
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse(); // Esto garantiza que no se quede bloqueado
6. Conclusión
En esta rápida guía, hemos demostrado cómo podemos utilizar un CountDownLatch
para bloquear un hilo hasta que otros hilos hayan terminado algún procesamiento. También hemos mostrado cómo puede usarse para ayudar a depurar problemas de concurrencia, asegurando que los hilos se ejecuten en paralelo.
Al aplicar el patrón CountDownLatch
adecuadamente, los programadores en Java pueden gestionar la ejecución de múltiples hilos de manera eficiente, ayudando a evitar errores de sincronización y asegurando que sus aplicaciones funcionen correctamente en entornos concurrentes.
Recordemos siempre que el manejo del flujo de hilos es crucial en la programación concurrente, y un CountDownLatch
es solo una herramienta más en nuestro arsenal para lograr esto.