Directions
1. Register the existing transactions
topic as a KSQL stream:
CREATE STREAM TRANSACTIONS_RAW (ACCOUNT_ID VARCHAR, \ TIMESTAMP VARCHAR, \ CARD_TYPE VARCHAR, \ AMOUNT DOUBLE, \ IP_ADDRESS VARCHAR, \ TRANSACTION_ID VARCHAR) \ WITH (KAFKA_TOPIC='transactions',\ VALUE_FORMAT='JSON');
2. Repartition the stream on account_id
, and use Avro for the target stream (this is optional):
CREATE STREAM TRANSACTIONS_SOURCE \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT * \ FROM TRANSACTIONS_RAW \ PARTITION BY ACCOUNT_ID;
3. Register the existing stream of customer data from Oracle in the topic customers as a KSQL stream:
CREATE STREAM CUST_RAW_STREAM (ID BIGINT, \ FIRST_NAME VARCHAR, \ LAST_NAME VARCHAR, \ EMAIL VARCHAR, \ AVG_CREDIT_SPEND DOUBLE) \ WITH (KAFKA_TOPIC='customers', \ VALUE_FORMAT='JSON');
4. Repartition the customer data stream by account_id
to prepare for the join, and use Avro for the target stream (this is optional):
CREATE STREAM CUSTOMER_REKEYED \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT * \ FROM CUST_RAW_STREAM \ PARTITION BY ID;
5. Register the partitioned customer data topic as a KSQL table used for the join with the incoming stream of transactions:
CREATE TABLE customer \ WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', \ VALUE_FORMAT='AVRO', \ KEY='ID');
6. Join the transactions to customer information:
CREATE STREAM TRANSACTIONS_ENRICHED AS \ SELECT T.ACCOUNT_ID, T.CARD_TYPE, T.AMOUNT, \ C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, \ C.AVG_CREDIT_SPEND \ FROM TRANSACTIONS_SOURCE T \ INNER JOIN CUSTOMER C \ ON T.ACCOUNT_ID = C.ID;
7. Aggregate the stream of transactions for each account ID using a two-hour tumbling window, and filter for accounts in which the total spend in a two-hour period is greater than the customer’s average:
CREATE TABLE POSSIBLE_STOLEN_CARD AS \ SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, \ T.ACCOUNT_ID, T.CARD_TYPE, SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, \ T.FULL_NAME, MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND \ FROM TRANSACTIONS_ENRICHED T \ WINDOW TUMBLING (SIZE 2 HOURS) \ GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME \ HAVING SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND) ;
Examine the output:
ksql> SELECT WINDOW_START, ACCOUNT_ID, CARD_TYPE, \ TOTAL_CREDIT_SPEND, FULL_NAME, AVG_CREDIT_SPEND \ FROM POSSIBLE_STOLEN_CARD; 2019-01-11 16:00:00 +0000 | 100019 | jcb | 90.69 | Horatius Keefe | 60.58 2019-01-11 16:00:00 +0000 | 100012 | mastercard | 84.04 | Juditha Shwalbe | 53.94 2019-01-11 16:00:00 +0000 | 100016 | maestro | 76.01 | Milo Drewes | 68.33 2019-01-11 16:00:00 +0000 | 100035 | visa-electron | 69.61 | Roxine Furminger | 59.68 …