Streaming ETL

Data Routing

KSQL queries run continuously. The output of a KSQL query can be persisted to a Kafka topic using the CREATE STREAM AS syntax. And thus, using these two features, it is possible to write simple yet powerful stream transformations using KSQL alone, which take a real-time feed of events from one Kafka topic, transform them and write them continually to another.

In this recipe, we’ll see how to route messages from a source topic to multiple destination topics based on conditions in the data. The scenario here is that Kafka receives log messages from numerous hosts and applications. We want to route the messages to three target topics based on the following conditions:

  1. Any message that is an ERRROR
  2. DEBUG messages originating from any com.microsoft app
  3. All other messages

Directions

The source event stream is called log_events.

{
  "host": "147.40.121.104",
  "app": "com.salon.Tempsoft",
  "severity": "ERROR",
  "message": "Erigeron serpentinus G.L. Nesom"
}

1. In KSQL, register the source log_events stream:

ksql> CREATE STREAM log_events \
      (host VARCHAR, app VARCHAR, severity VARCHAR, message VARCHAR) \
       WITH (KAFKA_TOPIC='log_events', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

2. Create a Kafka topic, populated by the source log_events, containing any messages that are ERROR:

ksql> CREATE STREAM LOG_ERRORS AS \
      SELECT * \
      FROM log_events \
      WHERE severity='ERROR';

 Message
----------------------------
 Stream created and running
----------------------------

3. Create a Kafka topic, populated by the source log_events, containing any messages that are DEBUG and from the com.microsoft app family:

ksql> CREATE STREAM LOG_MS_DEBUG AS \
      SELECT * \
      FROM log_events \
      WHERE severity='DEBUG' \
        AND app LIKE 'com.microsoft%';

 Message
----------------------------
 Stream created and running
----------------------------

4. Create a Kafka topic, populated by the source log_events, containing any messages that don’t match the above conditions:

ksql> CREATE STREAM LOG_ERRORS_OTHER AS \
      SELECT * \
      FROM log_events \
      WHERE   (severity!='ERROR') \
      AND NOT (severity='DEBUG' AND app LIKE 'com.microsoft%');

 Message
----------------------------
 Stream created and running
----------------------------

As a result of the above, three new Kafka topics are being populated by the continuous queries from KSQL above that route messages arriving on the source topic according to the criteria specified.

< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.