#apache-flink #flink-streaming #flink-cep
#apache-flink #flink-потоковая передача #flink-cep
Вопрос:
Я пытаюсь применить список шаблонов к Patternstream в CEP я попробовал что-то прикрепить код ниже я новичок в Flink, я не уверен, является ли это глупым подходом или правильным подходом, я не уверен, могу ли я использовать datastream внутри ProcessFunction в KeyedBroadCastFunction или нет
public static final MapStateDescriptor<String, String> patternDescriptor = new MapStateDescriptor<String,
String>("CEPPatternList", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
StreamExecutionEnvironment env = env.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<JSONObject> source =
env.fromCollection(Arrays.asList(event, event2, event3));//KafkaSource
DataStream<Tuple2<String, JSONObject>> eventDataStream =
source.map(new JSONObjectToTuple2());
DataStream<Tuple2<String, JSONObject>> eventStream = eventDataStream.keyBy(0);
DataStream<Tuple2<String, String>> patternStream =
env.fromElements(pattern).flatMap(new FlatMapFunction<String, Tuple2<String,
String>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {
out.collect(new Tuple2<>("PATTERN_1", value));
}
});
BroadcastStream<Tuple2<String, String>> broadcastStream =
patternStream.broadcast(patternDescriptor);
DataStream<Tuple2<String, JSONObject>> output =
eventStream.connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple2<String,
JSONObject>, Tuple2<String, String>, Tuple2<String, JSONObject>>() {
@Override
public void processElement(Tuple2<String, JSONObject> value, ReadOnlyContext ctx,
Collector<Tuple2<String, JSONObject>> out) throws Exception {
for (Map.Entry<String, String> patterns :
ctx.getBroadcastState(patternRuleDescriptor).immutableEntries()) {
String patternValue = patterns.getValue();
DataStream<Tuple2<String, JSONObject>> eventStream =
env.fromElements(value);
PatternStream<Tuple2<String, JSONObject>> patternStream =
cepPatternMatching.compile(patternValue,
eventStream);
OutputTag<Tuple2<String, JSONObject>> timedout = new OutputTag<Tuple2<String, JSONObject>>(
"timedout") {
};
SingleOutputStreamOperator<Tuple2<String, JSONObject>> result = patternStream.flatSelect(
timedout,
new EventTimeOut(),
new PatternFlatSelect()
);
result.flatMap(new FlatMapFunction<Tuple2<String, JSONObject>, Object>() {
@Override
public void flatMap(Tuple2<String, JSONObject> value, Collector<Object> out) throws Exception {
out.collect(value);
}
});
ctx.output(unMatched,value);
}
}
}
@Override
public void processBroadcastElement(Tuple2<String, String> value, Context ctx,
Collector<Tuple2<String, JSONObject>> out) throws Exception {
System.out.println("Pattern Name: " value.f0);
System.out.println("Pattern Condition: " value.f1);
ctx.getBroadcastState(patternDescriptor).put(value.f0, value.f1);
}
});
Я не уверен, что это правильный подход или нет, я получаю сообщение об ошибке ниже
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.BroadcastConnectedStream.clean(BroadcastConnectedStream.java:249)
at org.apache.flink.streaming.api.datastream.BroadcastConnectedStream.process(BroadcastConnectedStream.java:162)
at org.apache.flink.streaming.api.datastream.BroadcastConnectedStream.process(BroadcastConnectedStream.java:139)
at CEPPatternMatchingApp.main(CEPPatternMatchingApp.java:122)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 7 more
Я не выполняю сериализацию где-либо в своем коде, не уверен, почему возникает эта ошибка, я пытался даже реализовать сериализуемый интерфейс, но все равно получаю ошибку
Может кто-нибудь, пожалуйста, помогите мне, есть ли способ транслировать шаблоны CEP и выполнять итерации один за другим для применения к потоку данных
Комментарии:
1. Не могли бы вы поделиться полной трассировкой стека? В общем, Flink использует сериализацию для передачи пользовательского кода в taskmanagers, поэтому ни один из ваших пользовательских кодов не должен содержать ссылку на внешний класс.
2. @ArvidHeise Я прикрепляю ошибку stacktrace, о которой идет речь
3. О боже. Я только что увидел, что вы используете
DataStream
в пользовательском коде. Это вообще не сработает. Я также не уверен, можно ли вообще использовать CEP с шаблоном во время выполнения. Я подозреваю, что вам нужно вернутьсяDataStream
и реализовать его самостоятельно в ProcessFunction . Обратите внимание, что если вы знаете шаблоны в начале приложения, вы можете использовать CEP.