Introducción
En este artículo, analizaremos uno de los constructos más útiles de Java, java.util.concurrent
, para resolver el problema concurrente del productor-consumidor. Nos enfocaremos en la interfaz de BlockingQueue
y cómo los métodos de esta interfaz facilitan la escritura de programas concurrentes. Más adelante, mostraremos un ejemplo de un programa simple que tiene múltiples hilos productores y múltiples hilos consumidores.
1. Overview
La mayoría de los programadores que trabajan con Java están familiarizados con la programación concurrente, pero encontrar maneras eficientes de gestionar hilos puede ser complicado. El patrón productor-consumidor es uno de los modelos más conocidos en la programación concurrente y es esencial para la interacción entre hilos. La interfaz BlockingQueue
toma un papel central en este patrón, ya que permite la comunicación segura entre múltiples productores y consumidores sin necesidad de implementar la sincronización manualmente. Para más detalles sobre la interfaz BlockingQueue
, puedes consultar la documentación oficial.
2. Tipos de BlockingQueue
Podemos distinguir dos tipos de BlockingQueue
:
- Queue no limitada: puede crecer casi indefinidamente.
- Queue limitada: tiene una capacidad máxima definida.
2.1. Queue No Limitada
Crear colas no limitadas es simple:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<><>
La capacidad de blockingQueue
se establecerá en Integer.MAX_VALUE
. Todas las operaciones que añaden un elemento a la cola no limitada nunca se bloquearán, lo que podría hacer que crezca a un tamaño muy grande.
Lo más importante al diseñar un programa productor-consumidor utilizando BlockingQueue
no limitada es que los consumidores deben poder consumir mensajes tan rápido como los productores están añadiendo mensajes a la cola. De lo contrario, la memoria podría llenarse y obtendríamos una excepción de OutOfMemory
.
2.2. Queue Limitada
El segundo tipo de colas es la cola limitada. Podemos crear tales colas pasando la capacidad como argumento a un constructor:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
Aquí tenemos un blockingQueue
que tiene una capacidad igual a 10. Esto significa que cuando un productor intenta añadir un elemento a una cola que ya está llena, dependiendo del método que se haya utilizado para añadirlo (offer()
, add()
o put()
), se bloqueará hasta que haya espacio disponible para insertar el objeto. De lo contrario, las operaciones fallarán.
Utilizar una cola limitada es una buena manera de diseñar programas concurrentes, ya que cuando insertamos un elemento en una cola que ya está llena, dichas operaciones deben esperar hasta que los consumidores se pongan al día y hagan espacio disponible en la cola. Esto nos proporciona un mecanismo de control sin esfuerzo por nuestra parte.
3. API de BlockingQueue
Existen dos tipos de métodos en la interfaz BlockingQueue
: métodos responsables de añadir elementos a la cola y métodos que recuperan esos elementos. Cada método de estos dos grupos se comporta de manera diferente en caso de que la cola esté llena o vacía.
3.1. Añadiendo Elementos
add()
: devuelvetrue
si la inserción fue exitosa; de lo contrario, lanza unaIllegalStateException
.put()
: inserta el elemento especificado en una cola, esperando un espacio libre si es necesario.offer()
: devuelvetrue
si la inserción fue exitosa; de lo contrario, devuelvefalse
.offer(E e, long timeout, TimeUnit unit)
: intenta insertar un elemento en una cola y espera un espacio disponible dentro de un tiempo de espera específico.
3.2. Recuperando Elementos
take()
: espera el elemento cabeza de una cola y lo elimina. Si la cola está vacía, se bloquea y espera a que aparezca un elemento.poll(long timeout, TimeUnit unit)
: recupera y elimina el cabeza de la cola, esperando hasta el tiempo de espera especificado si es necesario para que un elemento esté disponible. Devuelvenull
después de un tiempo de espera.
Estos métodos son los bloques de construcción más importantes de la interfaz BlockingQueue
al construir programas productor-consumidor.
4. Ejemplo de Productor-Consumidor Multihilo
Vamos a crear un programa que consta de dos partes: un Productor y un Consumidor.
El Productor generará un número aleatorio entre 0 y 100 y lo colocará en una BlockingQueue
. Tendremos 4 hilos productores y utilizaremos el método put()
para bloquear hasta que haya espacio disponible en la cola.
Es importante recordar que debemos detener a nuestros hilos consumidores para que no esperen indefinidamente a que aparezca un elemento en una cola.
Una buena técnica para señalar desde el productor al consumidor que no hay más mensajes que procesar es enviar un mensaje especial llamado “poison pill”. Necesitamos enviar tantas “poison pills” como consumidores tengamos. Cuando un consumidor saque ese mensaje especial de la cola, terminará su ejecución de manera ordenada.
Veamos la clase del productor:
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
Nuestro constructor de productor toma como argumento el BlockingQueue
que se utiliza para coordinar el procesamiento entre el productor y el consumidor. Notamos que el método generateNumbers()
pondrá 100 elementos en una cola. También recoge un mensaje de poison pill
para saber qué tipo de mensaje debe introducir en una cola cuando la ejecución haya terminado. Ese mensaje necesita ser insertado poisonPillPerProducer
veces en la cola.
Cada consumidor tomará un elemento de un BlockingQueue
usando el método take()
, por lo que se bloqueará hasta que haya un elemento en la cola. Después de tomar un Integer
de una cola, verificará si el mensaje es una poison pill
; si es así, entonces la ejecución del hilo finalizará. De lo contrario, imprimirá el resultado en la salida estándar junto con el nombre actual del hilo.
Esto nos dará una idea del funcionamiento interno de nuestros consumidores:
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Lo importante a notar es el uso de una cola. Al igual que en el constructor del productor, una cola se pasa como argumento. Podemos hacer esto porque BlockingQueue
puede ser compartida entre hilos sin necesidad de sincronización explícita.
Ahora que tenemos nuestro productor y consumidor, podemos iniciar nuestro programa. Necesitamos definir la capacidad de la cola y la establecemos en 100 elementos.
Queremos tener 4 hilos productores y el número de hilos consumidores será igual al número de procesadores disponibles:
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
El BlockingQueue
se crea usando el constructor con una capacidad. Estamos creando 4 productores y N consumidores. Especificamos que nuestro mensaje poison pill
sea un Integer.MAX_VALUE
, ya que tal valor nunca será enviado por nuestro productor en condiciones normales. Lo más importante a destacar aquí es que BlockingQueue
se utiliza para coordinar el trabajo entre ellos.
Cuando ejecutamos el programa, 4 hilos productores pondrán números aleatorios en un BlockingQueue
y los consumidores tomarán esos elementos de la cola. Cada hilo imprimirá en la salida estándar el nombre del hilo junto con el resultado.
5. Conclusión
Este artículo presenta un uso práctico de BlockingQueue
y explica los métodos que se utilizan para añadir y recuperar elementos de esta. También hemos mostrado cómo construir un programa productor-consumidor multihilo utilizando BlockingQueue
para coordinar el trabajo entre productores y consumidores.
Como consejos finales para programadores especializados en Java:
- Escoge sabiamente entre colas limitadas y no limitadas: Según las necesidades de tu aplicación y la cantidad de memoria disponible, elige el tipo de
BlockingQueue
que mejor se adapte a tu caso de uso. - Utiliza la
poison pill
: Implementa el uso de unpoison pill
para permitir que los consumidores finalicen su ejecución de forma ordenada. - Mantente al tanto de la gestión de excepciones: Asegúrate de manejar las excepciones de forma adecuada para evitar que hilos productores o consumidores se interrumpan sin razón.
Con estas prácticas, podrás optimizar el rendimiento de tus aplicaciones concurrentes en Java, haciendo uso efectivo de las herramientas que ofrece el lenguaje.