Data Wrangling

Flattening JSON Kafka Messages

KSQL can be used to flatten the schema of data in a Kafka message. This can be useful when a downstream system requires the schema to be flat, and not nested. For example you may have data on a Kafka topic that looks like this:

{
  "user": {
    "first_name": "Lars",
    "last_name": "Treagus",
    "email": "ltreagus0@timesonline.co.uk"
  },
  "ip_address": "242.115.235.56",
  "logon_date": "2018-02-05T19:45:59Z"
}

You can use KSQL to process every message as it arrives on the source topic and write it to a new Kafka topic with the nesting removed so that that the message looks like this:

{
  "user_first_name": "Lars",
  "user_last_name": "Treagus",
  "user_email": "ltreagus0@timesonline.co.uk"
  "ip_address": "242.115.235.56",
  "logon_date": "2018-02-05T19:45:59Z"
}

You can also write the data as CSV, instead of JSON.

Directions

1. Register the existing user_logons topic for use as a KSQL stream called user_logons. Note the STRUCT data type for the nested field:

CREATE STREAM user_logons 
        (user      STRUCT<  
                          first_name VARCHAR, 
                          last_name  VARCHAR,  
                          email      VARCHAR 
                         >, 
        ip_address VARCHAR, 
        logon_date VARCHAR) 
        WITH (KAFKA_TOPIC ='user_logons', 
              VALUE_FORMAT='JSON');

2. Optionally, inspect the first few messages as they arrive:

SELECT * FROM user_logons LIMIT 5;

3. Write the flattened structure as a new Kafka topic, updated continually from new messages arriving on the source topic. Note the use of the operator to access the nested columns.

CREATE STREAM user_logons_all_cols                  
        WITH (KAFKA_TOPIC='user_logons_flat') AS    
        SELECT user->first_name AS USER_FIRST_NAME, 
               user->last_name  AS USER_LAST_NAME,  
               user->email      AS USER_EMAIL,      
                                   ip_address,      
                                   logon_date       
          FROM user_logons;

Note how the target Kafka topic is explicitly set. Without KAFKA_TOPIC specified, the name of the stream will be used.

The new stream populates a Kafka topic. You can see this from LIST TOPICS:

ksql> LIST TOPICS;

 Kafka Topic        | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
 user_logons        | true       | 1          | 1                  | 1         | 1
 user_logons_flat   | true       | 4          | 1                  | 0         | 0

The contents of the topic can be viewed by any Kafka client, or simply with PRINT from KSQL:

ksql> PRINT 'user_logons_flat';
Format:JSON
{"ROWTIME":1547205974896,"ROWKEY":"null","USER_FIRST_NAME":"Hetti","USER_LAST_NAME":"Debrett","USER_EMAIL":"hdebretthp@ask.com","IP_ADDRESS":"115.102.56.33","LOGON_DATE":"2017-11-17T06:26:31Z"}

Press Ctrl-C to exit the PRINT command.

If you want to write the flattened data to CSV output instead then add VALUE_FORMAT='DELIMITED' to the WITH clause:

CREATE STREAM user_logons_all_cols_csv                  
        WITH (KAFKA_TOPIC='user_logons_flat_csv', VALUE_FORMAT='DELIMITED') AS    
        SELECT user->first_name AS USER_FIRST_NAME, 
               user->last_name  AS USER_LAST_NAME,  
               user->email      AS USER_EMAIL,      
                                   ip_address,      
                                   logon_date       
          FROM user_logons;
ksql> PRINT 'user_logons_flat_csv' FROM BEGINNING LIMIT 5;
Format:STRING
4/9/19 3:40:37 PM UTC , NULL , Lian,McTrusty,lmctrusty1@thetimes.co.uk,63.110.21.234,2018-02-20T20:04:12Z
4/9/19 3:40:37 PM UTC , NULL , Carrissa,Halston,chalston5@sitemeter.com,220.61.74.180,2018-03-22T15:20:04Z
4/9/19 3:40:37 PM UTC , NULL , Korry,Really,kreally9@sakura.ne.jp,214.129.206.164,2018-03-27T02:45:42Z
4/9/19 3:40:37 PM UTC , NULL , Toma,Eisikovitsh,teisikovitshd@last.fm,105.163.137.4,2017-12-27T12:51:06Z
4/9/19 3:40:37 PM UTC , NULL , Izabel,Lenney,ilenneyh@addthis.com,189.48.198.142,2017-08-14T17:30:07Z
< Back to the Stream Processing Cookbook