KSQL все сообщения выходят из строя в потоке, и в теме есть данные

#apache-kafka #ksqldb

Вопрос:

Я относительно новичок в потоке Кафки и пытаюсь распечатать сообщения в потоке, который я создал. Может кто-нибудь сказать мне, почему все сообщения не удалось?

Когда я использую команду печати, я получаю это

 print 'main' from beginning limit 10;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2021/05/29 14:17:57.375 Z, key: <null>, value: A-1,2,5/21/2019 8:29,5/21/2019 9:29,34.808868,-82.269157,34.808868,-82.269157,0,Accident on Tanner Rd at Pennbrooke Ln.,439,Tanner Rd,R,Greenville,Greenville,SC,29607-6027,US,U
S/Eastern,KGMU,5/21/2019 8:53,76,76,52,28.91,10,N,7,0,Fair,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,Day,Day,Day,Day
 

Когда я запускаю описание расширенного потока, который я создал, я получаю следующее:

 
Name                 : ACCIDENTS_ORIGINAL
Type                 : STREAM
Timestamp field      : START_TIME
Key format           : KAFKA 
Value format         : DELIMITED
Kafka topic          : main (partitions: 1, replication: 1)
Statement            : CREATE STREAM ACCIDENTS_ORIGINAL (ID STRING, SEVERITY INTEGER, START_TIME STRING, 
END_TIME STRING, START_LAT DOUBLE, START_LNG DOUBLE, END_LAT DOUBLE, END_LNG DOUBLE, DISTANCE DOUBLE, 
DESCRIPTION STRING, NUMBER DOUBLE, STREET STRING, SIDE STRING, CITY STRING, COUNTY STRING, 
STATE STRING, ZIPCODE STRING, COUNTRY STRING, TIMEZONE STRING, AIRPORT_CODE STRING, 
WEATHER_TIME STRING, TEMPERATURE DOUBLE, WIND_CHILL DOUBLE, HUMIDITY DOUBLE, 
PRESSURE DOUBLE, VISIBILITY DOUBLE, WIND_DIRECTION STRING, WIND_SPEED STRING, 
PRECIPITATION DOUBLE, WEATHER_CONDITION STRING, AMENITY BOOLEAN, BUMP BOOLEAN, 
CROSSING BOOLEAN, GIVE_WAY BOOLEAN, JUNCTION BOOLEAN, NO_EXIT BOOLEAN, RAILWAY BOOLEAN, 
ROUNDABOUT BOOLEAN, STATION BOOLEAN, STOP BOOLEAN, TRAFFIC_CALMING BOOLEAN, 
TRAFFIC_SIGNAL BOOLEAN, TURNING_LOOP BOOLEAN, SUNRISE_SUNSET STRING, 
CIVIL_TWILIGHT STRING, NAUTICAL_TWILIGHT STRING, ASTRONOMICAL_TWILIGHT STRING)
WITH (KAFKA_TOPIC='main', KEY_FORMAT='KAFKA', TIMESTAMP='Start_Time', 
TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', VALUE_FORMAT='DELIMITED');
 

Кто-нибудь может помочь мне взглянуть и сказать, что я здесь делаю не так?

Небольшое обновление, я также попытался установить формат ключа как нет, но все равно получить все сообщения не удалось.

Комментарии:

1. Возможно, это не работает, потому что в вашем формате метки времени сначала указан год, а в ваших данных-нет

2. Спасибо! Я изменил формат метки времени, и теперь он работает.