détection des anomalies

Detecting Unusual Credit Card Activity

In this recipe, we aggregate credit card transactions for each customer over a two-hour period and join it with the customer’s average credit card spend. If the total credit card spend over the past two hours is more than the average credit card usage of a customer, the account will be flagged as a possible case of credit card theft.

You can try this out for yourself with the code here.

Directions

1. Register the existing transactions topic as a KSQL stream:

CREATE STREAM TRANSACTIONS_RAW (ACCOUNT_ID VARCHAR, \
                                TIMESTAMP VARCHAR, \
                                CARD_TYPE VARCHAR, \
                                AMOUNT DOUBLE, \
                                IP_ADDRESS VARCHAR, \
                                TRANSACTION_ID VARCHAR) \
                           WITH (KAFKA_TOPIC='transactions',\
                                 VALUE_FORMAT='JSON');

2. Repartition the stream on account_id, and use Avro for the target stream (this is optional):

CREATE STREAM TRANSACTIONS_SOURCE \
    WITH (VALUE_FORMAT='AVRO') AS \
          SELECT * \
            FROM TRANSACTIONS_RAW \
    PARTITION BY ACCOUNT_ID;

3. Register the existing stream of customer data from Oracle in the topic customers as a KSQL stream:

CREATE STREAM CUST_RAW_STREAM (ID BIGINT, \
                               FIRST_NAME VARCHAR, \
                               LAST_NAME VARCHAR, \
                               EMAIL VARCHAR, \
                               AVG_CREDIT_SPEND DOUBLE) \
              WITH (KAFKA_TOPIC='customers', \
                    VALUE_FORMAT='JSON');

4. Repartition the customer data stream by account_id to prepare for the join, and use Avro for the target stream (this is optional):

CREATE STREAM CUSTOMER_REKEYED \
    WITH (VALUE_FORMAT='AVRO') AS \
            SELECT * \
              FROM CUST_RAW_STREAM \
      PARTITION BY ID;

5. Register the partitioned customer data topic as a KSQL table used for the join with the incoming stream of transactions:

CREATE TABLE customer \
WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', \
      VALUE_FORMAT='AVRO', \
      KEY='ID');

6. Join the transactions to customer information:

CREATE STREAM TRANSACTIONS_ENRICHED AS \
SELECT   T.ACCOUNT_ID, T.CARD_TYPE, T.AMOUNT, \
          C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, \
          C.AVG_CREDIT_SPEND \
FROM     TRANSACTIONS_SOURCE T \
          INNER JOIN CUSTOMER C \
          ON T.ACCOUNT_ID = C.ID;

7. Aggregate the stream of transactions for each account ID using a two-hour tumbling window, and filter for accounts in which the total spend in a two-hour period is greater than the customer’s average:

CREATE TABLE POSSIBLE_STOLEN_CARD AS \
SELECT   TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, \
           T.ACCOUNT_ID, T.CARD_TYPE, SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, \
           T.FULL_NAME, MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND \
  FROM     TRANSACTIONS_ENRICHED T \
           WINDOW TUMBLING (SIZE 2 HOURS) \
  GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME \
  HAVING   SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND) ;

Examine the output:

ksql> SELECT WINDOW_START, ACCOUNT_ID, CARD_TYPE, \
      TOTAL_CREDIT_SPEND, FULL_NAME, AVG_CREDIT_SPEND \
      FROM POSSIBLE_STOLEN_CARD;
2019-01-11 16:00:00 +0000 | 100019 | jcb | 90.69 | Horatius Keefe | 60.58
2019-01-11 16:00:00 +0000 | 100012 | mastercard | 84.04 | Juditha Shwalbe | 53.94
2019-01-11 16:00:00 +0000 | 100016 | maestro | 76.01 | Milo Drewes | 68.33
2019-01-11 16:00:00 +0000 | 100035 | visa-electron | 69.61 | Roxine Furminger | 59.68
…
< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.