Streaming ETL

Enriching Streams with Static Data Loaded as a Table

Data that changes infrequently is often called static data, or reference data. It is generally information that is externally published or might be provided by a batch-based system. The type of data could be a customer account, close of business positions, pricing information, etc. It is also very useful when used in conjunction with a stream of information, where the stream usually combines a reference ID or something similar.

For example, a transaction could contain an account number for the recipient and the payee; however, we may want to filter against the organization that the sender’s account belongs to, and the organization is only available from the reference data, not the transaction stream. In this kind of situation, we can use KSQL to join between the stream of events and the reference data.

Directions

1. Register the existing txns topic for use as a KSQL stream called txns:

 CREATE STREAM txns (txn_id BIGINT, userid BIGINT, recipient BIGINT, amount DOUBLE) \
 WITH (KAFKA_TOPIC = 'txns', VALUE_FORMAT = 'json');

2. Inspect the first few messages:

 SELECT * FROM txns LIMIT 5;

3. Register the existing accounts topic for use as a KSQL table called accounts:

 CREATE TABLE accounts (ac_key BIGINT, username VARCHAR, company VARCHAR, created_date VARCHAR) \
 WITH (KEY='ac_key', KAFKA_TOPIC = 'accounts', VALUE_FORMAT = 'json');

4. Inspect the first few messages:

 SET 'auto.offset.reset'='earliest';
 SELECT * FROM accounts LIMIT 5;

5. Join the transactions stream with the account table to create a stream of enriched transactions:

 CREATE STREAM enriched_txns AS \
 SELECT TIMESTAMPTOSTRING(txns.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS TXN_TIMESTAMP, txn_id, userid, username, company, recipient, amount \
   FROM txns \
        INNER JOIN accounts \
        ON txns.userid = accounts.ac_key;

6. Filter the resulting transaction stream for transactions from particular company:

 SELECT * FROM enriched_txns\
  WHERE company = 'Nitzsche Group';

You should see the resulting transactions shown with user information, just for those in the specified company:

 2018-12-18 15:12:13 +0000 | 445 | 11 | Farra Stearn | Nitzsche Group | 9 | 84.11
 2018-12-18 15:12:15 +0000 | 448 | 11 | Farra Stearn | Nitzsche Group | 7 | 46.24
 2018-12-18 15:12:16 +0000 | 451 | 7 | Mendel Deyenhardt | Nitzsche Group | 8 | 38.02
 […]
< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.