KSQL es un motor de streaming open-source distribuido y en real-time sobre Apache Kafka que permite programar los streams en SQL, evitando la pogramación de código.
KSQL ofrece un gran conjunto de operaciones de procesado como agregaciones, joins, Windows, sesiones, …
Veamos un ejemplo de uso:
Arrancar KSQL:
· Primero arrancaremos el cluster de Kafka y la consola de KSQL
veré un prompt como este:
Crear Streams y tables:
· Imaginemos 2 tópicos en Kafka: pageviews y users con estos esquemas:
· Crearé un STREAM pageviews_original del tópico Kafka pageviews especificando el format DELIMITED, el stream tiene ademas de los atributos del tópico las columnas adicionales ROWTIME (para el timestamp del mensaje) y ROWKEY para la key del mensaje.
· Ahora creare una TABLE users_original del tópico Kafka users con el format JSON:
· Ahora tengo:
Hacer consultas:
· Ahora usare el SELECT para crear una query que devuelva datos del STREAM (por defecto KSQL lee los tópicos de STREAMS y TABLAS desde el último offset):
· Ahora creare una query persistente usando un CREATE STREAM delante del SELECT, al ser una query persistente los resultados de esta query se escriben a un topico (en este caso al PAGEVIEWS_FEMALE):
· Y ahora puedo usar un SELECT para ver los resultados según llegan:
· Podemos hacer otras queries como:
O:
· Puedo ver las queries en ejecución con:
· Las queries ejecutan continuamente como aplicaciones KSQL hasta que se terminan manualmente (no vale con cerrar la consola KSQL)
Para pararlas hare un TERMINATE <Query_ID>
Podéis ver un ejemplo más complejo aquí. y un ejemplo que extrae datos desde un Oracle aquí
.