Administration

Counting Messages in a Topic

At some point in your journey with Apache Kafka®, you might ask, “How many messages are in that topic?” You know enough to avoid using just the latest offset—because it’s possible messages at lower offsets have since been deleted. For an active topic, the number of messages is constantly changing and thus may have little value, but in some cases, even a rough estimate is good enough. There are also times where stricter validation is necessary.

For instance, say you have a failover scenario using Confluent Replicator. It might be important to know how many messages were in the source topic before cutting back over if messages were produced on the destination cluster during the downtime. Any client can be programmed to do this count for you, but KSQL makes it really easy and gives you access to filters if there are specific types of message that you want to count.

Directions

Let’s assume you have some customer profile data in a compacted topic. The messages in this topic might look something this:

19:{"full_name":"Skipper Koppe",
    "birthdate":"1983-02-28",
    "fav_animal":"Squirrel, malabar",
    "fav_colour":"Fuscia",
    "fav_movie":"Return Of The Ghostbusters"}

Now obviously, this is important customer information that we are keeping track of for various applications. Since this is a compacted topic, we expect a single message for each key with few exceptions depending on the how often we clean up our partitions segments. In our example above, the key is 19 and the value is our JSON string. This is all great, but one of our developers has asked how many customers we have in this topic. It doesn’t have to be exact because they are just trying to understand how much memory would be required if they store all of the customers in their application.

1. Create a KSQL stream on our topic:

CREATE STREAM customer_profile (\
                  full_name  VARCHAR,\
                  birth_date VARCHAR,\
                  fav_animal VARCHAR,\
                  fav_color  VARCHAR,\
                  fav_movie  VARCHAR) \
            WITH (KAFKA_TOPIC = 'customers',\
                  VALUE_FORMAT = 'JSON');

2. Change the auto.offset.reset parameter so KSQL will read from the beginning of the topic:

SET 'auto.offset.reset' = 'earliest';

3. Create a new KSQL stream from our previous stream that includes some derived columns to GROUP BY and SUM:

CREATE STREAM customer_profile_count AS \
      SELECT 'Records' AS field1,  \
                     1 AS sum_this \
      FROM customer_profile;

4. Run a SELECT statement to sum our derived column and count the messages:

SELECT   field1, \
         SUM(sum_this) AS message_count \
FROM     customer_profile_count \
GROUP BY field1;

5. As new messages come into the original topic, the message count will update and emit another output message. If you leave the SELECT statement running you will see this value increase as time passes because new messages are being added to the topic in the background.

A more complex use case might require a filter or join that could be applied at various steps. The same concept can be used for a non-compacted topic if the need should arise.

< Back to the Stream Processing Cookbook

Nous utilisons des cookies afin de comprendre comment vous utilisez notre site et améliorer votre expérience. Cliquez ici pour en apprendre davantage ou pour modifier vos paramètres de cookies. En poursuivant la navigation, vous consentez à ce que nous utilisions des cookies.