#java #apache-kafka #apache-beam #beam-sql
#java #apache-kafka #apache-beam #beam-sql
Вопрос:
Во-первых, у нас есть исходный код kafka в формате JSON:
{"event_time": "2020-08-23 18:36:10", "word": "apple", "cnt": 1}
{"event_time": "2020-08-23 18:36:20", "word": "banana", "cnt": 1}
{"event_time": "2020-08-23 18:37:30", "word": "apple", "cnt": 2}
{"event_time": "2020-08-23 18:37:40", "word": "apple", "cnt": 1}
... ...
То, что я пытаюсь сделать, это агрегировать сумму количества каждого слова по каждой минуте:
--------- ---------- ---------------------
| word | SUM(cnt) | window_start |
--------- ---------- ---------------------
| apple | 1 | 2020-08-23 18:36:00 |
--------- ---------- ---------------------
| banana | 1 | 2020-08-23 18:36:00 |
--------- ---------- ---------------------
| apple | 3 | 2020-08-23 18:37:00 |
--------- ---------- ---------------------
Таким образом, в этом случае идеально подошел бы следующий оператор Beam SQL:
SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start
FROM t_count_stats
GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)
И ниже приведен мой текущий рабочий код, использующий Java SDK Beam для выполнения этого потокового SQL-запроса:
import avro.shaded.com.google.common.collect.ImmutableMap;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class KafkaBeamSqlTest {
private static DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// create pipeline
PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args)
.withoutStrictParsing()
.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(kafkaOption);
// create kafka IO
KafkaIO.Read<String, String> kafkaRead =
KafkaIO.<String, String>read()
.withBootstrapServers("127.0.0.1:9092")
.withTopic("beamKafkaTest")
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "client-1"))
.withReadCommitted()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.commitOffsetsInFinalize();
// read from kafka
PCollection<KV<String, String>> messages = pipeline.apply(kafkaRead.withoutMetadata());
// build input schema
Schema inputSchema = Schema.builder()
.addStringField("word")
.addDateTimeField("event_time")
.addInt32Field("cnt")
.build();
// convert kafka message to Row
PCollection<Row> rows = messages.apply(ParDo.of(new DoFn<KV<String, String>, Row>(){
@ProcessElement
public void processElement(ProcessContext c) {
String jsonData = c.element().getValue();
// parse json
JSONObject jsonObject = JSON.parseObject(jsonData);
// build row
List<Object> list = new ArrayList<>();
list.add(jsonObject.get("word"));
list.add(dtf.parseDateTime((String) jsonObject.get("event_time")));
list.add(jsonObject.get("cnt"));
Row row = Row.withSchema(inputSchema)
.addValues(list)
.build();
System.out.println(row);
// emit row
c.output(row);
}
}))
.setRowSchema(inputSchema);
// sql query
PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("t_count_stats"), rows)
.apply(
SqlTransform.query(
"SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_startn"
"FROM t_count_statsn"
"GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)"
)
);
// sink results back to another kafka topic
result.apply(MapElements.via(new SimpleFunction<Row, KV<String, String>>() {
@Override
public KV<String, String> apply(Row input) {
System.out.println("result: " input.getValues());
return KV.of(input.getValue("word"), "result=" input.getValues());
}
}))
.apply(KafkaIO.<String, String>write()
.withBootstrapServers("127.0.0.1:9092")
.withTopic("beamPrint")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
// run
pipeline.run();
}
}
Моя проблема в том, что когда я запускаю этот код и передаю некоторые сообщения в Kafka, не возникает исключения, и он получил некоторые сообщения от Kafka, но я не вижу, чтобы это запускало процесс агрегирования окон. И ожидаемого результата не получается (как в таблице, которую я показывал ранее).
Итак, поддерживает ли Beam SQL в настоящее время синтаксис window в неограниченном источнике ввода Kafka? Если это так, что не так с моим текущим кодом? Как я могу отладить и исправить это? И есть ли какой-либо пример кода, который интегрирует Beam SQL с KafkaIO?
Пожалуйста, помогите мне! Большое спасибо!!
Ответ №1:
Похоже, на этот вопрос был дан ответ на <a rel="noreferrer noopener nofollow" href="https://lists.apache.org/thread.html/rea75c0eb665f90b8483e64bee96740ebb01942c606f065066c2ecc56@» rel=»nofollow noreferrer»>https://lists.apache.org/thread.html/rea75c0eb665f90b8483e64bee96740ebb01942c606f065066c2ecc56@