Data Wrangling

Using Event Time from IoT Sensor Readings

Sensor data typically includes a timestamp of when the value was read. You can use KSQL to format a stream that uses this data timestamp as the message timestamp. This is as an alternative to the default ROWTIME value, where Apache Kafka® normally writes its internal timestamp, set by the producing application or just the time that the message was written to the brokers. Using the timestamp from the sensor reading makes windowing and aggregate operations more intuitive because those operations will be based on when the data was sampled.


1. Below is a sample of the raw sensor data that is being produced to a Kafka topic, where the first element of the delimited data is the epoch timestamp of the sensor reading.


2. Create a stream with the sensor time as the TIMESTAMP:

CREATE STREAM sensor_stream ( \
    ts BIGINT, \
    accel_x DOUBLE, \
    accel_y DOUBLE, \
    accel_z DOUBLE,  \
    gyro_x INTEGER, \
    gyro_y INTEGER, \
    gyro_z INTEGER, \
    temp_c INTEGER \
    ) WITH (KAFKA_TOPIC='sensor_delimited', VALUE_FORMAT='DELIMITED', TIMESTAMP='ts');

Notice the timestamp (ts) is already being specified as BIGINT and the TIMESTAMP=ts in the WITH clause.

From here, we can run a quick validation query in the Confluent Control Center UI and see that the ROWTIME and TS are in fact the same values:Confluent Control Center UI

3. We now have raw data and a stream we can query based on the sensor reading timestamp, so let’s use the other side of the stream and table duality to look at the data further. Here, the sensor includes motion data, and we can analyze the maximum acceleration rate of the x-axis over the past hour.

ksql> CREATE TABLE sensor_table_accel_x_hourly AS \
   SELECT ts,\
          TIMESTAMPTOSTRING(windowstart(), 'yyyy-MM-dd HH:mm:ss') AS window_start_ts,\
          TIMESTAMPTOSTRING(windowend(), 'yyyy-MM-dd HH:mm:ss') AS window_end_ts,\
          max(accel_x)  AS max_x_accel\
     FROM sensor_stream \
 GROUP BY accel_x;

From there, we can see what the maximum acceleration in g-forces was in the past hour:

ksql> SELECT window_start_ts, window_end_ts, max_x_accel \
2018-12-28  22:37:34 | 2018-12-28  22:41:06 | -0.004395
< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.