Event Time Processing

Sensor Data Enrichment

In manufacturing, it’s common to monitor each production line independently. Within the context of the line, a given sensor reading refers to the item passing the station at the time the reading is taken. But this model leads to challenges when expressing the joining of sensor reading and item number together. KSQL can enrich the data and bring clarity to the chaos.

Directions

Imagine you are monitoring a production line in a manufacturing plant (for example, in a car assembly line). You get a notification when a new piece of work (e.g., a chassis) passes the start of the line via a message like this:

{
  "informationPath":"simulation/informationString",
  "WorkpieceID":"0020181",
  "timestamp":1541623171000
}

There are a number of sensors along the production line that take readings as the workpiece (partially assembled car) passes them. Each emit readings which aren’t related to the WorkPieceID (actual car chassis), but only to the timestamp at which the reading was made. These have the following structure:

{  
  "sensorPath":"simulation/machine/plc/sensor-1",
  "Value":7.0,
  "timestamp":1541623171000
}

Neither of these streams of data contains any key, so in order to join them we have to introduce one. At this stage, it’s helpful to think what happens outside the context of just a single production line. Any given car company has multiple lines, so let’s introduce an artificial extra column LineID, and say that this data is coming from line one. You can see how this would make sense as you only want to look at the sensor data in the context of the corresponding indicator.

1. Spin up a test environment using docker-compose.yml and create two topics: indicator and sensor.

2. Now let’s send some sample data to Apache Kafka®.

The first two chassis enter the production line:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t indicator -P << EOF {"informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000} {"informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000} 
EOF

And at the same time, the sensor network generates some readings:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sensor -P <<EOF {"sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000} {"sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000} {"sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000} {"sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000} {"sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000} {"sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000} EOF

3. Now, we start KSQL and register the streams.

4. Start the KSQL CLI:

docker run --network cos_default --interactive --tty --rm \ confluentinc/cp-ksql-cli:5.0.0 \ http://ksql-server:8088

5. Register the two topics as streams:

ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp'); 
Message 
---------------- 
Stream created 
---------------- 
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp'); 
Message 
---------------- 
Stream created 
----------------

6. Create derived streams to add a key:

ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'line1' AS LINE_ID FROM sensor PARTITION BY LINE_ID; 
Message 
---------------------------- 
Stream created and running 
----------------------------
ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'line1' as LINE_ID FROM indicator PARTITION BY LINE_ID; 
Message 
---------------------------- 
Stream created and running 
----------------------------

7. Create a table with the current WorkItemID.

8. Having added LINE_ID as a key to the indicator data, we can now register it as a KSQL table. In a table, the state of the key is returned by KSQL instead of every event. We’re using this approach to determine the WorkPieceID to associate with the sensor reading, based on the timestamp.

ksql> CREATE TABLE CURRENT_WORKITEM (LINE_ID VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY=LINE_ID); 
Message 
--------------- 
Table created 
---------------

9. Join the table to the sensor data stream for the win!

ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN CURRENT_WORKITEM I ON S.LINE_ID=I.LINE_ID;
Message 
---------------------------- 
Stream created and running 
----------------------------

Since this is KSQL, the SENSOR_ENRICHED stream (and underlying topic of the same name) will be continually populated, driven by events arriving on the sensor topic and reflecting new items entering the production line based on events sent to the indicator topic.

< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.