1. Below is a sample of the raw sensor data that is being produced to a Kafka topic, where the first element of the delimited data is the epoch timestamp of the sensor reading.
1546033893602,0.013611,0.001587,-0.003052,0,-1,-1,21 1546033893735,-0.004700,-0.001587,-0.010132,0,0,0,21 1546033893864,-0.003723,-0.001099,-0.001587,0,0,0,21 1546033893995,-0.001526,-0.000610,0.002075,0,0,-1,21 1546033894122,-0.001526,-0.003296,0.006470,0,0,0,21 1546033894255,0.000427,-0.005981,-0.016479,-1,0,0,21
2. Create a stream with the sensor time as the
CREATE STREAM sensor_stream ( \ ts BIGINT, \ accel_x DOUBLE, \ accel_y DOUBLE, \ accel_z DOUBLE, \ gyro_x INTEGER, \ gyro_y INTEGER, \ gyro_z INTEGER, \ temp_c INTEGER \ ) WITH (KAFKA_TOPIC='sensor_delimited', VALUE_FORMAT='DELIMITED', TIMESTAMP='ts');
Notice the timestamp (
ts) is already being specified as
BIGINT and the
TIMESTAMP=ts in the
From here, we can run a quick validation query in the Confluent Control Center UI and see that the
TS are in fact the same values:
3. We now have raw data and a stream we can query based on the sensor reading timestamp, so let’s use the other side of the stream and table duality to look at the data further. Here, the sensor includes motion data, and we can analyze the maximum acceleration rate of the x-axis over the past hour.
ksql> CREATE TABLE sensor_table_accel_x_hourly AS \ SELECT ts,\ TIMESTAMPTOSTRING(windowstart(), 'yyyy-MM-dd HH:mm:ss') AS window_start_ts,\ TIMESTAMPTOSTRING(windowend(), 'yyyy-MM-dd HH:mm:ss') AS window_end_ts,\ max(accel_x) AS max_x_accel\ FROM sensor_stream \ WINDOW SESSION (60 MINUTES) \ GROUP BY accel_x;
From there, we can see what the maximum acceleration in g-forces was in the past hour:
ksql> SELECT window_start_ts, window_end_ts, max_x_accel \ FROM SENSOR_TABLE_ACCEL_X_HOURLY \ LIMIT 1; 2018-12-28 22:37:34 | 2018-12-28 22:41:06 | -0.004395