Skip to content

Kafka

Concepts

TIP

Kafka EcoSystem

Kafka EcoSysten

Kafka Cluster

Strimzi

Strimzi

[Official Docs]

Kafka Connect

Sources

Sinks

Camel Tools

Official Docs

Ksql

Ksql CLI

shell
$ ./bin/ksql --help
shell
--execute <execute>, -e <execute>
# Execute one or more SQL statements and quit.

--file <scriptFile>, -f <scriptFile>
# Execute commands from a file and exit.

Queries (Pull vs Push)

Streams

Sometimes, you need to get data from the header of the topic, to help with that, ksql provides an interesting feature:

sql
CREATE STREAM IF NOT EXISTS FLIGHTS_STREAM (
  flight_id    STRING,
  from_airport STRING,
  to_airport   STRING,
  coordinates  STRUCT <
    lat STRING,
    lon STRING
  >
  produced_at  BYTES HEADER('producedAt')
) WITH (
  KAFKA_TOPIC  = 'source_topic',
  VALUE_FORMAT = 'JSON'
);

With that you get the header as bytes, that can be decoded in another stream.

sql
CREATE OR REPLACE 
  FLIGHTS_STREAM_DECODED 
WITH ( VALUE_FORMAT = 'JSON' )
AS
  SELECT
    flight_id,
    from_airport,
    to_airport,
    coordinates->lat as coordinates_lat,
    coordinates->lon as coordinates_lon,
    FROM_BYTES(produced_at, 'utf-8') as produced_at
  FROM 
    FLIGHTS_STREAM
  PARTITION BY flight_id;

Tables

sql
-- TABLE <> SOURCE TABLE
CREATE SOURCE TABLE
  TABLE_FLIGHTS (
    flight_id STRING PRIMARY KEY,
    from_airport STRING,
    to_airport STRING,
    coordinates_lat STRING,
    coordinates_lon STRING,
    produced_at STRING
  ) WITH (
    KAFKA_TOPIC = 'FLIGHTS_STREAM_DECODED',
    VALUE_FORMAT = 'JSON'
  )

Joins

Feel free to use any content here.