Que es LMAX Disruptor

LMAX Disruptor (http://lmax-exchange.github.io/disruptor/) es una librería Java que proporciona mensajería entre hilos con mayor rendimiento y funcionalidades que una cola clásica.

Para entender el funcionamiento de LMAX Disruptor vamos explicar los principales elementos que lo componen:

· Ring Buffer: LMAX utiliza un Ring Buffer en memoria para almacenar los eventos intercambiados. Proporciona mayor rendimiento que un Buffer clásico debido a que la memoria utilizada se reserva una única vez y cada slot será reutilizado almacenando nuevos eventos que sobrescriben otros ya consumidos.

· Productores: Son los hilos que producen eventos para ser consumidos por otros procesos consumidores. Se trata de clases de usuario que utilizan el RingBuffer para alojar en un slot disponible el evento producido a la espera de ser consumido.

· Consumidores: Se trata de clases que implementan la interfaz EventHandler o WorkHandler. Son las clases de usuario en las que se codifica la lógica de recepción de un evento.

LMax Disruptor contempla diferentes escenarios de mensajería entre hilos:

· Productor simple o Productores multiples.

· Multicasting de Eventos: El mismo evento es tratado por multiples consumidores, cada uno con su propia lógica de negocio y en operaciones paralelas e independientes.

· Pools de Workers: Permite crear un pool de consumidores donde cada evento es despachado a solo un consumidor libre del pool.

Asimismo soporta coordinación entre consumidores permitiendo establecer cadenas de notificación de eventos de manera que un evento se puede notificar primero a un conjunto de consumidores, una vez estos finalicen, se notificará a otro grupo de consumidores y así sucesivamente.

Internamente esto es soportado por la librería mediante los siguientes conceptos:

· Sequence: Cada Consumidor tiene un objeto de tipo Secuencia asociado. Esto permite a la librería localizar el evento a notificar al Consumidor

· SecuenceBarrier: Cada Consumidor tiene asociado un objeto de este tipo para determinar si existen elementos disponibles para ser Consumidos. El SecuenceBarrier especifico de cada Consumidor tiene asociados los objetos Sequence de los consumidores que se tienen que ejecutar previamente según lo establecido en la cadena de coordinación.

· EventProcessor: Se trata de la clase que agrupa a todos los consumidores (EventHandlers o WorkHandlers) para notificarles los eventos de manera coordinada.

En la imagen podemos ver todos los conceptos de los que hemos hablado. Representa un escenario en el que varios hilos productores notifican eventos, que deben ser consumidos de tal manera que primero se asegure su almacenamiento en disco (Journaling) y su replicación, y una vez realizadas estas dos tareas, sean procesados. De manera que tenemos coordinación entre los consumidores.

Vemos:

· 2 hilos Productores de eventos publicando en el RingBuffer

· 3 Consumidores implementando la interfaz EventHandler, cada uno con su propio objeto Sequence para recuperar el siguiente evento del RingBuffer:

o JournalConsumer: Representando un proceso de persistencia de eventos en un fichero.

o ReplicationConsumer: Representado un proceso que envía los eventos a una máquina remota para replicarlos.

o ApplicationConsumer: Representando la lógica de negocio para tratar el evento.

· 3 SecuenceBarrier, uno por cada consumidor indicándole cuando tiene disponible el siguiente elemento en el RingBuffer.

o Aquí es donde vemos que el SequenceBarrier del Consumidor ApplicationConsumer, que realiza la lógica de negocio, está conectado a los objetos Sequence de los Consumidores que se deben ejecutar previamente. De manera que ApplicationConsumer no podrá ir al RingBuffer a por el evento hasta que aquellos no terminen su ejecución.

A continuación vamos a mostrar como utilizar LMAX Disruptor con un par de escenarios sencillos. En la documentación y código fuente en github (https://github.com/LMAX-Exchange/disruptor/) se pueden encontrar muchos más ejemplos:

Lo primero es añadir la librería a las dependencias maven de nuestro proyecto:

El primer escenario que vamos a mostrar es un productor publicando un evento que debe ser procesado por 3 consumidores (Handlers), de manera que primero se ejecutarán 2 de ellos y cuando finalicen se ejecutará el tercero:

Primero creamos la clase que representa el evento intercambiado:


import com.lmax.disruptor.EventFactory;

public class Event {

private String value;

public void setValue(String value) {

this.value = value;

}

public String toString() {

return value;

}

public final static EventFactory<Event> factory = new EventFactory<Event>() {

public Event newInstance() {

return new Event();

}

};

}

Como vemos, se trata de un evento que contiene un String y además añade una variable estática de tipo EventFactory. Esta variable se utilizará más adelante y será invocada por la librería para reservar la memoria del RingBuffer, creando cada uno de los slots invocando a su método newInstance().

A continuación creamos la clase del consumidor implementando EventHandler:


import com.lmax.disruptor.EventHandler;

public class Handler implements EventHandler<Event> {

private String id;

public Handler(String id) {

this.id = id;

}

@Override

public void onEvent(Event event, long sequence, boolean batchEnd) throws Exception {

System.out.println(event.toString());

}

}

Seguidamente configuramos un objeto de tipo Disruptor. Primero se instancia, informando un EventFactory con una factoria de los eventos a intercambiar, el tamaño del RingBuffer y un ExecutorService.

La factoria de eventos y el tamaño del RingBuffer se utilizan para alojar en memoria el RingBuffer, mientras que el ExecutorService se utilizará para crear los hilos en los que se ejecutarán los consumidores.

Y después se registran los Consumidores o Handlers estableciendo el orden de ejecución (h1 y h2 de manera paralela y cuando estos finalizen, se ejecutará h3).

Una vez configurado, se lanza, devolviendo el RingBuffer, que quedará disponible para que lo utilicen los hilos productores:


ExecutorService executor = Executors.newCachedThreadPool();

Disruptor<Event> disruptor;

disruptor = new Disruptor<Event>(Event.factory, 1024, executor);

disruptor.handleEventsWith(new Handler("h1"), new Handler("h2")).then(new Handler("h3"));

RingBuffer<Event> ringBuffer=disruptor.start();

Finalmente, el productor de eventos puede solicitar al RingBuffer la siguiente ranura libre para publicar un evento, que LMAX Disruptor se encargará de notificar a los consumidores:


long sequence = ringBuffer.next();

Event event = ringBuffer.get(sequence);

event.setValue("Evento 1");

ringBuffer.publish(sequence);

El segundo escenario que vamos a mostrar es el de utilizar LMAX Disruptor con un pool de consumidores o Workers, de manera que cada evento se notifica solo a uno de estos Workers que esté disponible:

Primero creamos la clase de nuestros Workers, implementando WorkHandler:


public class MessageEventWorkHandler implements WorkHandler<Event>{

@Override

public void onEvent(Event event) throws Exception {

System.out.println("Worker: - Ejecutandose en hilo - "+Thread.currentThread().getId()+" – Recibe evento: "+event.toString());

}

}

A continuación instanciamos un Array con todos los Workers del pool:


MessageEventWorkHandler[] workers=new MessageEventWorkHandler[workersPoolSize];

for(int i=0;i<workersPoolSize;i++){

workers[i]=new MessageEventWorkHandler(i+1, this.storageService, this.cacheService);

}

Seguidamente, creamos un objeto de tipo WorkerPool registrando el array de Workers anterior y la factoria de eventos, y lo lanzamos pasándole un Executor para que cree los hilos sobre los que lanzar los Workers:


WorkerPool<ValueEvent> pool = new WorkerPool<ValueEvent>(Event.EVENT_FACTORY, new FatalExceptionHandler(), workers);

Executor executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);

ringBuffer = pool.start(executor);

Finalmente el ringBuffer será utilizado por los productores como en el ejemplo anterior para enviar eventos al pool de Workers.

Deja una respuesta

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Salir /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Salir /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Salir /  Cambiar )

Conectando a %s