Directions
This example uses a topic where the messages have two timestamps:
- The internal message timestamp
- An application-generated timestamp column named
loadDate
The message payload looks like this:
{ "loadDate": 1525221132954, "code": "Foo42", "user": "Rick" }
1. By default, KSQL will use the timestamp of the Kafka message itself. You can inspect the timestamp value that KSQL is using through the ROWTIME
system column:
ksql> SELECT ROWTIME, loadDate, code, user from event_data; 1525339102591 | 1525221132954 | Foo42 | Rick
2. Timestamps are stored as epoch values. To make it human-readable, use the provided TIMESTAMPTOSTRING
function:
ksql> SELECT ROWTIME, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), \ loadDate, TIMESTAMPTOSTRING(loadDate, 'yyyy-MM-dd HH:mm:ss'), \ code, user \ FROM event_data; 1525339102591 | 2018-05-03 10:18:22 | 1525221132954 | 2018-05-02 01:32:12 | Foo42 | Rick
3. In the above message, the Kafka message’s timestamp is 2018-05-03 10:18:22
, and the loadDate
field in the message payload is 2018-05-02 01:32:12
.
If you perform a time-based aggregation on the data, you’ll see that Kafka puts the above event in the time window associated with the message’s timestamp (2018-05-03 10:18:22
):
ksql> CREATE TABLE events_per_hour AS \ SELECT user, code, COUNT(*) AS event_count \ FROM event_data WINDOW TUMBLING (SIZE 1 HOUR) \ GROUP BY user, code; Message --------------------------- Table created and running --------------------------- ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), user, code, event_count \ FROM events_per_hour; 2018-05-03 10:00:00 | Rick | Foo42 | 1
This result could be good enough, but for this example, you want to view the data in a time dimension provided within the message itself.
4. In this example, the data is processed using the loadDate
field from the Kafka message. To accomplish this, you pass loadDate
as the TIMESTAMP
parameter when you register the KSQL stream (this is also valid for KSQL tables):
ksql> CREATE STREAM event_data_by_loadDate (loadDate LONG, code VARCHAR, user VARCHAR) \ WITH (KAFKA_TOPIC='event_data', \ VALUE_FORMAT='JSON', \ TIMESTAMP='loadDate'); Message ---------------- Stream created ----------------
5. Examine the ROWTIME
field, and you’ll see that it matches loadDate
:
ksql> SELECT ROWTIME, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), \ loadDate, TIMESTAMPTOSTRING(loadDate, 'yyyy-MM-dd HH:mm:ss'), \ code, user \ FROM event_data_by_loadDate; 1525221132954 | 2018-05-02 01:32:12 | 1525221132954 | 2018-05-02 01:32:12 | Foo42 | Rick
6. You can also validate whether any columns are being used to override the timestamp of messages with DESCRIBE EXTENDED
:
ksql> DESCRIBE EXTENDED event_data_by_loadDate; Type : STREAM Key field : Timestamp field : LOADDATE [...]
7. Going back to the time-based aggregation that was shown above, you can see that the event is assigned to the time window appropriate to the loadDate
(which is 2018-05-02 01:32:12
):
ksql> CREATE TABLE events_per_hour_by_loadDate AS \ SELECT user, code, COUNT(*) AS event_count \ FROM event_data_by_loadDate WINDOW TUMBLING (SIZE 1 HOUR) \ GROUP BY user, code; Message --------------------------- Table created and running --------------------------- ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), user, code, event_count \ FROM events_per_hour_by_loadDate; 2018-05-02 01:00:00 | Rick | Foo42 | 1
Remember, this is based on data from the same source Kafka topic, just handling the timestamp in different ways. The first uses the timestamp provided within the Kafka message itself; the second uses an arbitrary field from within the message’s payload.