MQTT es un protocolo de conectividad enfocado a M2M (machine-to-machine) y al IOT (Internet of Things) ya que se ha diseñado para ser un protocolo de mensajería extremadamente ligero basado en TCP.

Es útil para conexiones con sitios remotos donde es fundamental un pequeño foodprint (como Arduino) y el ancho de banda es muy importante.

Una característica muy importante es que al ser un protocolo tan ligero existen clientes y servidores MQTT en diversos lenguajes (http://mqtt.org/software) , desde Java, C a Arduino, Javascript. Además muchos brokers de mensajería como IBM MQ o RabbitMQ soportan este ligero protocolo de modo que el cliente pueda publicar y suscribirse a esta mensajería a través de este protocolo.

Ya dedicamos un post hace tiempo a MQTT, en este caso vamos a usar una de las librerías clientes para comunicar con un Broker MQTT y ver cómo se usa este protocolo.

· Como Broker/Server MQTT usaremos el Servidor público http://test.mosquitto.org/

· Como cliente usaremos mqtt-client que es una librería cliente MQTT Java open source

MQTT tiene un API bloqueantes y un API no bloqueante usando Future.

Nuestro Broker http://test.mosquitto.org/ soporta 3 tipos de conexiones:

· Puerto 1883 es el Puerto estándar para comunicaciones sin encriptar

· Puertos 8883 y 8884 usan encriptación SSL/TLS. Puerto 8884 requiere un certificado.

Vamos ya al lío!!!

· Creo un proyecto Maven con el arquetipo:

> mvn archetype:generate -DgroupId=com.lmgracia.mqtt.mqttclient.demo -DartifactId=EjemploMQTTClient -DpackageName=com.lmgracia.mqtt.mqttclient.demo -Dversion=1.0.0

Y dejando opciones por defecto.

· Actualizamos la dependencia de JUnit a

<version>4.8.1</version>

· Añado la dependencia a mqtt-client y el repositorio para que pueda descargarla:

<dependency>
 <groupId>org.fusesource.mqtt-client</groupId>
 <artifactId>mqtt-client</artifactId>
 <version>1.4</version>
</dependency>
<repositories>
 <repository>
 <id>fusesource.snapshots</id>
 <name>FuseSource Snapshot Repository</name>
 <url>http://repo.fusesource.com/nexus/content/repositories/public</url>
</repository>
</repositories>

· Genero proyecto de Eclipse

>mvn eclipse:eclipse

Y lo import en Eclipse

· Crearemos un Test para probar el las APIS bloqueantes y Future: TestAPIsMQTTClient:

· Además de estas APIs se pueden usar Listeners

MQTT ofrece muchas opciones:

CALIDADES DE SERVICIO

Permite configurar el nivel de seguridad de un mensaje de PUBLISH:

· Al menos una vez

· Exactamente una vez

· Como mucho una vez

CLIENTE MQTT

· setClientId: permite establecer el id del cliente para la sesión. Lo usa el servidor MQTT para identificar la sesión. Por defecto es un id autogenerado.

· setCleanSession: se pone a falso si quieres que el MQTT Server persista suscripciones a tópicos y acks entre sesiones de cliente. Por defecto a true.

· setKeepAlive: configura Keep Alive en segundos. Es el interval máximo entre mensajes recibidos desde un cliente. Permite al servidor detectar que se ha cerrado la conexión de un cliente.

· setUserName y setPassword establece usuario y password para autenticar contra el servidor.

RECONEXIONES:

Muy interesantes es que MQTT automáticamente reconectará y reestablecerá sesión si sucede algún fallo en la red.

Se puede configurar cada cuanto se intenta una reconexión y número máximo de reintentos.

· setConnectAttempsMax: número máximo de reconexiones antes de que un error se reporte al cliente en el primer intento de conexión con el servidor. -1 para intentos ilimitados. Por defecto -1.

· setReconnectAttemptsMax: número máximo de reconexiones antes de que un error se reporte al cliente después de que una conexión se haya establecido con el servidor. -1 para intentos ilimitados. Por defecto -1.

Abajo incluimos el código del TestAPIsMQTTClient:

package com.lmgracia.mqtt.mqttclient.demo;

import static org.fusesource.hawtbuf.Buffer.utf8;

import java.nio.Buffer;

import junit.framework.Assert;

import org.fusesource.mqtt.client.BlockingConnection;

import org.fusesource.mqtt.client.Future;

import org.fusesource.mqtt.client.FutureConnection;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.Message;

import org.fusesource.mqtt.client.Promise;

import org.fusesource.mqtt.client.QoS;

import org.fusesource.mqtt.client.Topic;

import org.junit.Test;

public class TestAPIsMQTTClient {

@Test

public void testPublishAndSuscribeBlocking() throws Exception {

MQTT mqtt = new MQTT();

mqtt.setHost(“tcp://test.mosquitto.org:1883”);

BlockingConnection connection = mqtt.blockingConnection();

connection.connect();

Topic[] topics = {new Topic(utf8(“MiTopico”), QoS.AT_LEAST_ONCE)};

byte[] qoses = connection.subscribe(topics);

connection.publish(“MiTopico”, “Hola a todos”.getBytes(), QoS.AT_LEAST_ONCE, false);

Message message = connection.receive();

Assert.assertEquals(“Hola a todos”, new String(message.getPayload())) ;

message.ack();

connection.disconnect();

}

public void testPublishAndSuscribeBlockingFuture() throws Exception {

MQTT mqtt = new MQTT();

mqtt.setHost(“tcp://test.mosquitto.org:1883”);

final Promise<Buffer> result = new Promise<Buffer>();

FutureConnection connection = mqtt.futureConnection();

Future<Void> f1 = connection.connect();

f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8(“MiTopico”), QoS.AT_LEAST_ONCE)});

byte[] qoses = f2.await();

Future<Message> receive = connection.receive();

connection.publish(“MiTopico”, “Hola de nuevo”.getBytes(), QoS.AT_LEAST_ONCE, false);

Message message = receive.await();

Assert.assertEquals(“Hola de nuevo”, new String(message.getPayload()));

message.ack();

connection.disconnect().await();

}

}