KSQL es un motor de streaming open-source distribuido y en real-time sobre Apache Kafka que permite programar los streams en SQL, evitando la pogramación de código.

KSQL ofrece un gran conjunto de operaciones de procesado como agregaciones, joins, Windows, sesiones, …

Veamos un ejemplo de uso:

Arrancar KSQL:

· Primero arrancaremos el cluster de Kafka y la consola de KSQL

veré un prompt como este:

Crear Streams y tables:

· Imaginemos 2 tópicos en Kafka: pageviews y users con estos esquemas:

· Crearé un STREAM pageviews_original del tópico Kafka pageviews especificando el format DELIMITED, el stream tiene ademas de los atributos del tópico las columnas adicionales ROWTIME (para el timestamp del mensaje) y ROWKEY para la key del mensaje.

· Ahora creare una TABLE users_original del tópico Kafka users con el format JSON:

· Ahora tengo:

Hacer consultas:

· Ahora usare el SELECT para crear una query que devuelva datos del STREAM (por defecto KSQL lee los tópicos de STREAMS y TABLAS desde el último offset):

· Ahora creare una query persistente usando un CREATE STREAM delante del SELECT, al ser una query persistente los resultados de esta query se escriben a un topico (en este caso al PAGEVIEWS_FEMALE):

· Y ahora puedo usar un SELECT para ver los resultados según llegan:

· Podemos hacer otras queries como:

O:

· Puedo ver las queries en ejecución con:

· Las queries ejecutan continuamente como aplicaciones KSQL hasta que se terminan manualmente (no vale con cerrar la consola KSQL)

Para pararlas hare un TERMINATE <Query_ID>

Podéis ver un ejemplo más complejo aquí. y un ejemplo que extrae datos desde un Oracle aquí

.