Networks

Merging Streams

It’s usually the case that most topics have one producer responsible for publishing events. However, in an environment with multiple regions, it’s possible that there are multiple producers for the same topic in different regions. In these environments, there is usually at least one aggregation point where these topics are all replicated to, as some consumer applications need a broader view than just their local region.

KSQL makes merging multiple streams into one easy using the INSERT INTO keyword. For this example, imagine two Kafka clusters, one in Austin, Texas, and one in London, England. In both locations, you have a change data capture client reading from local purchase order databases and publishing to their local Kafka cluster.

You might do some processing locally, but at some point, you want all of the events in a single topic in Austin. You don’t necessarily care about message ordering at this point, but your consumer needs a single topic.

Directions

1. Let’s assume you have both topics on your cluster in Austin using Confluent Replicator:

  • orders_cdc_austin
  • orders_cdc_london

Both topics contain purchase order data with a simple format based on the example below:

{
  "change_op": "insert",
  "order_id": 1,
  "product_id": 12,
  "product_loc": "aus"
}

With KSQL, create streams on top of both topics using the JSON format:

ksql> CREATE STREAM orders_cdc_austin_stream \
      (change_op VARCHAR, order_id INT, product_id INT, product_loc VARCHAR) \
      WITH (KAFKA_TOPIC='orders_cdc_austin', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> CREATE STREAM orders_cdc_london_stream \
      (change_op VARCHAR, order_id INT, product_id INT, product_loc VARCHAR) \
      WITH (KAFKA_TOPIC='orders_cdc_london', VALUE_FORMAT='JSON');

Message
----------------
 Stream created
----------------

2. Now, you can create a new stream from one of the existing streams with a new output topic. The example below uses the Austin stream:

ksql> CREATE STREAM orders_cdc_stream \
      WITH (KAFKA_TOPIC='orders_cdc', VALUE_FORMAT='JSON') \
      AS SELECT * FROM orders_cdc_austin_stream;

Message
----------------
 Stream created
----------------

3. At last, you can merge the London stream using the INSERT INTO statement.

ksql> INSERT INTO orders_cdc_stream \
      SELECT * FROM orders_cdc_london_stream;

Message
-------------------------------
 Insert Into query is running.
-------------------------------

Note: The schema and partitioning column produced by the query must match the stream’s schema and key, respectively. If the schema and partitioning column are incompatible with the stream, then the statement will return an error. KSQL tables are not supported in the FROM clause of an INSERT INTO... SELECT FROM statement.

The result is a stream and topic that has all of the purchase order events from both Austin and London available for consumption. With a few basic KSQL statements, you have created a stream processing application that unites like streams to suit your needs.

< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.