Networks

Detecting and Analyzing Suspicious Network Activity

When malware attacks a network, detecting it early is essential to preventing its spread and damage caused. By processing network device logs in real time with KSQL, it is possible to detect the kind of unusual activity that may indicate a malware infection.

Directions

The source event stream is from a router and shows traffic source and destination details.

{
  "event_ts": "1526078852000",
  "source_ip": "214.85.228.173",
  "dest_ip": "34.22.126.18",
  "source_prt": 44423,
  "dest_prt": 23
}

1. In KSQL, register the FIREWALL_EVENTS stream.

ksql> CREATE STREAM FIREWALL_EVENTS WITH (KAFKA_TOPIC='FW_SRC5', VALUE_FORMAT='AVRO', TIMESTAMP='event_ts');

 Message
----------------
 Stream created
----------------

Note that the event time is used in the stream definition, so that the time-based processing is done based on when the event occurred, not when it was written to the system. This also enables KSQL to handle late-arriving data automatically.

2. Inspect the event data, here looking at the number of events per destination network port per one-minute window.

ksql> SELECT DEST_PRT, COUNT(*) FROM FIREWALL_EVENTS WINDOW TUMBLING (SIZE 1 MINUTES) GROUP BY DEST_PRT;
443 | 58271
80  | 7163
22  | 10

3. Write to a Kafka topic when any port—other than standard expected ones for HTTP/TLS traffic—sees more traffic within a minute than would be expected (e.g., > 100):

ksql> CREATE TABLE SUSPICIOUS_NET_ACTIVITY AS \
      SELECT DEST_PRT, COUNT(*) AS EVENT_COUNT \
      FROM FIREWALL_EVENTS \
      WINDOW TUMBLING (SIZE 1 MINUTES) \
      WHERE DEST_PRT !=80 \
        AND DEST_PRT !=443 \
      GROUP BY DEST_PRT \
      HAVING COUNT(*)>100;

 Message
---------------------------
 Table created and running
---------------------------

The resulting Kafka topic (SUSPICIOUS_NET_ACTIVITY) can be used to trigger alerts directly, stream into a real-time dashboard view or serve as the basis for further analysis.

Here’s an example where 149 connections were made to port 420 within a period of one minute:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), DEST_PRT, EVENT_COUNT FROM SUSPICIOUS_NET_ACTIVITY;
2018-05-11 09:30:00 | 339 | 298

4. Using the same criteria as above—over 100 connections to non-standard ports within a one-minute window—we can create a second output topic that captures the source IP of the traffic, too, so as to help identify the compromised machine(s):

ksql> CREATE TABLE SUSPICIOUS_HOST_ACTIVITY AS \
  SELECT SOURCE_IP, DEST_PRT, COUNT(*) AS EVENT_COUNT \
  FROM FIREWALL_EVENTS \
  WINDOW TUMBLING (SIZE 1 MINUTES) \
  WHERE DEST_PRT !=80 \
    AND DEST_PRT !=443 \
  GROUP BY SOURCE_IP, DEST_PRT \
  HAVING COUNT(*)>100;

Message
---------------------------
Table created and running
---------------------------

In this example, there are three hosts identified, each sending numerous connections to port 339 on target machines:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), SOURCE_IP, DEST_PRT, EVENT_COUNT FROM SUSPICIOUS_HOST_ACTIVITY;
2018-05-11 09:30:00 | 49.2.224.203 | 339 | 298
2018-05-11 09:31:00 | 241.212.158.23 | 339 | 143
2018-05-11 09:32:00 | 8.115.29.34 | 339 | 201

5. Apache Kafka® persists data. Therefore, even events that have already been received can be analyzed.

Taking the example of the suspicious host activity above, we can use predicates in KSQL to not only examine past events but also new events as they arrive that match the specified conditions:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), \
      SOURCE_IP, DEST_IP, SOURCE_PRT, DEST_PRT \
      FROM FIREWALL_EVENTS \
      WHERE DEST_PRT=339 \
        AND SOURCE_IP='49.2.224.203';

2018-05-11 09:30:00 | 49.2.224.203 | 62.215.118.49 | 1255 | 339
2018-05-11 09:30:01 | 49.2.224.203 | 103.34.25.18 | 769 | 339
2018-05-11 09:30:01 | 49.2.224.203 | 228.1.115.206 | 9 | 339
2018-05-11 09:30:01 | 49.2.224.203 | 35.95.206.140 | 471 | 339
2018-05-11 09:30:05 | 49.2.224.203 | 212.251.81.193 | 872 | 339
2018-05-11 09:30:05 | 49.2.224.203 | 27.50.117.85 | 551 | 339
2018-05-11 09:30:06 | 49.2.224.203 | 187.138.232.240 | 1884 | 339
2018-05-11 09:30:08 | 49.2.224.203 | 13.120.9.221 | 1160 | 339
2018-05-11 09:30:09 | 49.2.224.203 | 80.142.1.209 | 893 | 339
2018-05-11 09:30:08 | 49.2.224.203 | 207.34.18.160 | 439 | 339

KSQL is a streaming query language, and thus the above query will run continually, showing all new events that match the condition as they arrive, until the query is cancelled. You can also use the LIMIT clause to only show a specified number of messages.

< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.