détection des anomalies

ATM Fraud Detection

Identifying fraudulent transactions in a timely manner enables businesses to save money by stopping them sooner. By taking a Kafka topic of transaction events, KSQL can be used to identify transactions taking place on the same account within a short time period. It can also apply additional criteria such as differentiating between transactions in the same location versus elsewhere.

For more details, read the blog post ATM Fraud Detection with Apache Kafka and KSQL.

Directions

1. Register the source topic of ATM transactions for use as a KSQL stream called ATM_TXNS:

 CREATE STREAM ATM_TXNS (account_id VARCHAR, \
                         atm VARCHAR, \
                         location STRUCT<lon DOUBLE, \
                                   lat DOUBLE>, \
                         amount INT, \
                         timestamp VARCHAR, \
                         transaction_id VARCHAR) \
         WITH (KAFKA_TOPIC='atm_txns_gess', \
         VALUE_FORMAT='JSON', \
         TIMESTAMP='timestamp', \
         TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss X');

2. Create a clone of the first stream:

 CREATE STREAM ATM_TXNS_02 \
         WITH (PARTITIONS=1) AS \
 SELECT * FROM ATM_TXNS;

3. Show a live feed of possibly fraudulent transactions:

 SELECT T1.ACCOUNT_ID AS ACCOUNT_ID, \
         TIMESTAMPTOSTRING(T1.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS T1_TIMESTAMP, \
        TIMESTAMPTOSTRING(T2.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS T2_TIMESTAMP, \
         T1.TRANSACTION_ID, T2.TRANSACTION_ID, \
         T1.AMOUNT, T2.AMOUNT, \
         T1.ATM, T2.ATM \
 FROM   ATM_TXNS T1 \
 INNER JOIN ATM_TXNS_02 T2 \
         WITHIN (0 MINUTES, 10 MINUTES) \
         ON T1.ACCOUNT_ID = T2.ACCOUNT_ID \
 WHERE   T1.TRANSACTION_ID != T2.TRANSACTION_ID \
 AND   (T1.location->lat != T2.location->lat OR \
         T1.location->lon != T2.location->lon) \
 AND   T2.ROWTIME != T1.ROWTIME;

This implements the following criteria:

  • Same account ID
  • Transaction takes place at a different ATM
  • Transaction takes place within 10 minutes of the previous transaction

4. For possibly fraudulent transactions, show the distance between the ATMs and the time difference between the transactions:

 SET 'auto.offset.reset' = 'earliest';

 SELECT T1.ACCOUNT_ID AS ACCOUNT_ID, \
         TIMESTAMPTOSTRING(T1.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS T1_TIMESTAMP, \
         TIMESTAMPTOSTRING(T2.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS T2_TIMESTAMP, \
         GEO_DISTANCE(T1.location->lat, T1.location->lon, \
                 T2.location->lat, T2.location->lon, 'KM') AS DISTANCE_BETWEEN_TXN_KM, \
         (CAST(T2.ROWTIME AS DOUBLE) - CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 AS MINUTES_DIFFERENCE \
 FROM   ATM_TXNS T1 \
 INNER JOIN ATM_TXNS_02 T2 \
         WITHIN (0 MINUTES, 10 MINUTES) \
         ON T1.ACCOUNT_ID = T2.ACCOUNT_ID \
 WHERE   T1.TRANSACTION_ID != T2.TRANSACTION_ID \
 AND   (T1.location->lat != T2.location->lat OR \
         T1.location->lon != T2.location->lon) \
 AND   T2.ROWTIME != T1.ROWTIME;

5. Write a live stream of all suspected fraudulent transactions as they take place to a new Kafka topic:

 CREATE STREAM ATM_POSSIBLE_FRAUD  \
                 WITH (PARTITIONS=1) AS \
 SELECT  T1.ROWTIME AS T1_TIMESTAMP, \
         T2.ROWTIME AS T2_TIMESTAMP, \
         GEO_DISTANCE(T1.location->lat, \
                      T1.location->lon, \
                      T2.location->lat, \
                      T2.location->lon, \
                      'KM') AS DISTANCE_BETWEEN_TXN_KM, \
         (T2.ROWTIME - T1.ROWTIME) AS MILLISECONDS_DIFFERENCE,  \
         (CAST(T2.ROWTIME AS DOUBLE) - \
          CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 AS MINUTES_DIFFERENCE,  \
         T1.ACCOUNT_ID AS ACCOUNT_ID, \
         T1.TRANSACTION_ID, T2.TRANSACTION_ID, \
         T1.AMOUNT, T2.AMOUNT, \
         T1.ATM, T2.ATM \
 FROM   ATM_TXNS T1 \
 INNER JOIN ATM_TXNS_02 T2 \
         WITHIN (0 MINUTES, 10 MINUTES) \
         ON T1.ACCOUNT_ID = T2.ACCOUNT_ID \
 WHERE   T1.TRANSACTION_ID != T2.TRANSACTION_ID \
 AND   (T1.location->lat != T2.location->lat OR \
         T1.location->lon != T2.location->lon) \
 AND   T2.ROWTIME != T1.ROWTIME;
< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.