Directions
1. Create a Kafka topic to receive messages with DELIMITED
data:
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic LegacyData
2. Create the KSQL stream for DELIMITED
data:
ksql> CREATE STREAM source_delimited (column1 STRING, column2 STRING) WITH (KAFKA_TOPIC='LegacyData', VALUE_FORMAT='DELIMITED');
3. Create the KSQL stream for the converted Avro data, which automatically converts the delimited source data to Avro:
ksql> CREATE STREAM target_avro WITH (KAFKA_TOPIC='LegacyDataAvro',VALUE_FORMAT='AVRO') AS SELECT * FROM source_delimited;
4. Create a KSQL SELECT
query to test the conversion:
ksql> select * from target_avro LIMIT 1;
5. Send a test message with DELIMITED
data to the source topic and validate that the target topic receives the data in Avro format (check the query from step 4):
$ echo "hello,world" | kafka-console-producer --broker-list localhost:9092 --topic LegacyData
6. Validate that the target topic was created with the value format set to Avro:
ksql> describe target_avro;