Streaming ETL

Inline Streaming Aggregation

Although the aggregates (e.g., maximum, minimum, average) for events in a stream are correct when the current event has been processed, they become stale once the next event has been processed. As an alternative to storing the current calculation only (i.e., the avg()/min()/max() for all events so far), this recipe shows you how to calculate the current aggregation and append it to the event.

By using the predicate WHERE MAX_VALUE = VALUE, for example, you can filter for the event with the current max() of a value.

This recipe can be used whenever aggregation on a numeric value must be recalculated for each event—typically in cases that involve monitoring numeric trends for incoming events.

Directions

Each streamed event must have three columns to do the aggregation: a unique identifier, grouping column and numeric measure. Here are three examples of what these columns might look like in real-world data:

Unique Identifier

Grouping Column

Numeric Measure

Event_id
888392912

COUNTRY
SINGAPORE

STORY_TONE
-12.281121211

Reading_id
7781

TRANSFORMER
TRNF_JJAH_0019

Meter_Reading
000066621

Stocktick_id
00000817261

STOCK_EXCHANGE
NYSE

Stock_price
1013.23

 

The example below aggregates streaming news stories to detect the most negative and positive story per country, per day.

In the data shown, the first news article for Country = SGP has a slightly negative tone of -0.3322. The second story for Country = USA has a quite negative tone of -7.9166. The third story, again for Country = USA, is more negative than the prior story for USA with a tone of -11.0429.

{
  "eventid": 781057555,
  "country": "SGP",
  "avgtone": "-0.3322",
  "source_url": "https://www.dealstreetasia.com/stories/singtel-bofa-australia-amaysim-105211/"
}

{
  "eventid": 781057850,
  "country": "USA",
  "avgtone": "-7.9166", "source_url":"https://www.washingtonpost.com/local/local-digest-man-at-baltimore-airport-arrested-after-loaded-handgun-found-in-carry-on-luggage/2018/08/21/77753574-a57a-11e8-8fac-12e98c13528d_story.html"
}
{
  "eventid": 781058406,
  "country": "USA",
  "avgtone": "-11.0429", "source_url":"https://lebanon-express.com/news/local/lebanon-woman-charged-with-arson/article_915bb184-1167-5890-b159-53f791fad9b0.html"
}

Here are the steps to do inline streaming aggregation on the data:

1. Ensure that the SOURCE_TOPIC contains (EVENTID STRING, ACTOR1COUNTRYCODE STRING, AVGTONE STRING, SOURCE_URL STRING).

2. Create a KSQL STREAM on the topic:

CREATE STREAM ST_TOPIC WITH (kafka_topic='SOURCE_TOPIC', value_format='AVRO');

3. Aggregate messages in the ST_TOPIC stream into a table called TB_GROUPBY. This table emits one row for each new message in ST_TOPIC. The topic underlying the table is automatically keyed on column COUNTRY.

ksql> CREATE TABLE TB_GROUPBY AS                               \
SELECT                                                         \
  ACTOR1COUNTRYCODE                            as COUNTRY      \
, cast(count(*) as bigint)                     as C_COUNT      \
, SUM(cast(AVGTONE as DOUBLE)) / COUNT(*)      as C_AVGTONE    \
, max(cast(AVGTONE as DOUBLE))                 as C_MAXTONE    \
, min(cast(AVGTONE as DOUBLE))                 as C_MINTONE    \
, cast(MAX(cast(EVENTID as bigint)) as STRING) as LAST_EVENTID \
    FROM ST_TOPIC                                              \
GROUP BY ACTOR1COUNTRYCODE; 

Message
---------------------------
 Table created and running

4. Rekey KSQL table TB_GROUPBY from COUNTRY to LAST_EVENTID so that it can be joined on LAST_EVENTID, since KSQL joins must be on the key column. Step one of the rekey pattern creates a stream from the table.

ksql> CREATE STREAM ST_REKEY1 WITH (KAFKA_TOPIC='TB_GROUPBY', VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

5. Step two of the rekey pattern is to repartition the data by the new key, LAST_EVENTID.

ksql> CREATE STREAM ST_REKEY2 AS SELECT * FROM ST_REKEY1 PARTITION BY LAST_EVENTID;

Message
----------------------------
 Stream created and running
----------------------------

6. Finally, join the rekeyed stream to the source topic. KSQL emits a row whenever a match occurs within 60 seconds for a stream-stream join.

ksql> CREATE STREAM ST_TOPIC_ENRICHED AS  \
SELECT                                    \
  EVENTID                                 \
, ST_TOPIC.ACTOR1COUNTRYCODE as COUNTRY   \
, SOURCEURL                               \
, AVGTONE                                 \
, C_MINTONE                               \
, C_AVGTONE                               \
, C_MAXTONE                               \
, cast(C_COUNT as INT) as C_COUNT         \
FROM ST_TOPIC                             \
JOIN ST_REKEY2                            \
WITHIN 60 SECONDS                         \
ON (EVENTID=LAST_EVENTID);       


 Message
----------------------------
 Stream created and running
----------------------------

7. Select the expanded event from the final topic:

ksql> SELECT eventid,country,avgtone,c_mintone,c_avgtone,c_maxtone,c_count,substring(SOURCEURL,0,20) FROM ST_TOPIC_ENRICHED;
Event_id    CTRY    avgtone    c_mintone   c_avgtone   c_maxtone count  sourceurl
781057555 | SGP | -0.332225 | -0.3322259 | -0.332225 | -0.332225 | 1 | https://www.dealstre..
781057850 | USA | -7.916666 | -7.9166666 | -7.916666 | -7.916666 | 1 | https://www.washingt..
781058406 | USA | -11.04294 | -11.042944 | -9.479805 | -7.916666 | 2 | https://lebanon-expr..

The final topic contains all of the data from the source event, along with additional columns that compute aggregations for each event:

Event_id    CTRY    avgtone    c_mintone   c_avgtone   c_maxtone count  sourceurl
781057555 | SGP | -0.332225 | -0.3322259 | -0.332225 | -0.332225 | 1 | https://www.dealstre..
781057850 | USA | -7.916666 | -7.9166666 | -7.916666 | -7.916666 | 1 | https://www.washingt..
781058406 | USA | -11.04294 | -11.042944 | -9.479805 | -7.916666 | 2 | https://lebanon-expr..

8. Events that triggered a new maximum/minimum can be detected using a filter predicate (a WHERE clause). This can be useful to implement real-time threshold rules for monitoring trends for streaming events.

ksql> 
SELECT country, event_id as CURRENT_MOST_NEGATIVE_STORY_ID \
  FROM ST_TOPIC_ENRICHED \
 WHERE avgtone = c_mintone ; 

CTRY | current_most_negative_story_id  |   avgtone | sourceurl
..
USA  |         781058406               | -11.04294 | https://lebanon-expr..
..
< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.