Не удается обработать сообщение kafka json с помощью библиотеки Flink siddhi

#java #json #apache-kafka #apache-flink #siddhi

Вопрос:

Я пытаюсь создать простое приложение, в котором приложение будет потреблять сообщение Кафки, выполнять некоторые преобразования cql и публиковать в Кафке, и ниже приведен код:

JAVA: 1.8 Флинк: 1.13 Scala: 2.11 флинк-сиддхи: 2.11-0.2.2-СНИМОК

Я использую библиотеку: https://github.com/haoch/flink-siddhi

введите json в Kafka:

 {
   "awsS3":{
      "ResourceType":"aws.S3",
      "Details":{
         "Name":"crossplane-test",
         "CreationDate":"2020-08-17T11:28:05 00:00"
      },
      "AccessBlock":{
         "PublicAccessBlockConfiguration":{
            "BlockPublicAcls":true,
            "IgnorePublicAcls":true,
            "BlockPublicPolicy":true,
            "RestrictPublicBuckets":true
         }
      },
      "Location":{
         "LocationConstraint":"us-west-2"
      }
   }
}
 

основной класс:

 public class S3SidhiApp {
    public static void main(String[] args) {
        internalStreamSiddhiApp.start();
        //kafkaStreamApp.start();
    }
}
 

Класс приложений:

 package flinksidhi.app;

import com.google.gson.JsonObject;
import flinksidhi.event.s3.source.S3EventSource;

import io.siddhi.core.SiddhiManager;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.json.JSONObject;


import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static flinksidhi.app.connector.Consumers.createInputMessageConsumer;
import static flinksidhi.app.connector.Producer.*;

public class internalStreamSiddhiApp {

    private static final String inputTopic = "EVENT_STREAM_INPUT";
    private static final String outputTopic = "EVENT_STREAM_OUTPUT";
    private static final String consumerGroup = "EVENT_STREAM1";
    private static final String kafkaAddress = "localhost:9092";
    private static final String zkAddress = "localhost:2181";

    private static final String S3_CQL1 = "from inputStream select * insert into temp";
    private static final String S3_CQL = "from inputStream select json:toObject(awsS3) as obj insert into temp;"  
            "from temp select json:getString(obj,'$.awsS3.ResourceType') as affected_resource_type,"  
            "json:getString(obj,'$.awsS3.Details.Name') as affected_resource_name,"  
            "json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration') as encryption,"  
            "json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm') as algorithm insert into temp2; "  
            "from temp2 select  affected_resource_name,affected_resource_type, "  
            "ifThenElse(encryption == ' ','Fail','Pass') as state,"  
            "ifThenElse(encryption != ' ' and algorithm == 'aws:kms','None','Critical') as severity insert into outputStream";


    public static void start(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataStream<String> inputS = env.addSource(new S3EventSource());

        //Flink kafka stream consumer
        FlinkKafkaConsumer<String> flinkKafkaConsumer =
                createInputMessageConsumer(inputTopic, kafkaAddress,zkAddress, consumerGroup);

        //Add Data stream source -- flink consumer
        DataStream<String> inputS = env.addSource(flinkKafkaConsumer);
        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

        cep.registerExtension("json:toObject", io.siddhi.extension.execution.json.function.ToJSONObjectFunctionExtension.class);
        cep.registerExtension( "json:getString", io.siddhi.extension.execution.json.function.GetStringJSONFunctionExtension.class);
        cep.registerStream("inputStream", inputS, "awsS3");


        inputS.print();

        System.out.println(cep.getDataStreamSchemas());
        //json needs extension jars to present during runtime.
        DataStream<Map<String,Object>> output = cep
                .from("inputStream")
                .cql(S3_CQL1)
                .returnAsMap("temp");


        //Flink kafka stream Producer
        FlinkKafkaProducer<Map<String, Object>> flinkKafkaProducer =
                createMapProducer(env,outputTopic, kafkaAddress);

        //Add Data stream sink -- flink producer
        output.addSink(flinkKafkaProducer);
        output.print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

 

Consumer class:

 package flinksidhi.app.connector;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.json.JSONObject;

import java.util.Properties;
public class Consumers {
    public static FlinkKafkaConsumer<String> createInputMessageConsumer(String topic, String kafkaAddress, String zookeeprAddr, String kafkaGroup ) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaAddress);
        properties.setProperty("zookeeper.connect", zookeeprAddr);
        properties.setProperty("group.id",kafkaGroup);
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(
                topic,new SimpleStringSchema(),properties);
        return consumer;
    }
}
 

Класс производителя:

 package flinksidhi.app.connector;

import flinksidhi.app.util.ConvertJavaMapToJson;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.json.JSONObject;

import java.util.Map;

public class Producer {

    public static FlinkKafkaProducer<Tuple2> createStringProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<Tuple2>(kafkaAddress, topic, new AverageSerializer());
    }

    public static FlinkKafkaProducer<Map<String,Object>> createMapProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<Map<String,Object>>(kafkaAddress, topic, new SerializationSchema<Map<String, Object>>() {
            @Override
            public void open(InitializationContext context) throws Exception {

            }

            @Override
            public byte[] serialize(Map<String, Object> stringObjectMap) {
                String json = ConvertJavaMapToJson.convert(stringObjectMap);
                return json.getBytes();
            }
        });
    }
}
 

Я перепробовал много вещей, но код, в котором вызывается CQL, никогда не вызывается и даже не выдает никаких ошибок, не зная, в чем дело.

То же самое, если я создаю внутренний источник потока и использую тот же входной json для возврата в виде строки, это работает.

Ответ №1:

Первоначальное предположение: если вы используете четное время, уверены ли вы, что правильно определили водяные знаки? Как указано в документах:

(…) входящий элемент первоначально помещается в буфер, где элементы сортируются в порядке возрастания на основе их метки времени, и когда поступает водяной знак, все элементы в этом буфере с метками времени, меньшими, чем у водяного знака, обрабатываются (…)

Если это не поможет, я бы предложил разложить/упростить работу до минимума, например, просто оператор источника и некоторые наивные элементы печати/регистрации. И если это сработает, начните добавлять обратно операторов по одному. Вы также можете начать с максимального упрощения шаблона CEP.

Комментарии:

1. да , я пробовал очень простые задания , и это отлично работает с потребителем и производителем Кафки, даже этот пример сработал, если я только остановлю поток данных из кафки и использую тот, что вверху, который является внутренним источником данных . Но в тот момент , когда я переключаюсь на кафку в качестве источника входных данных, CEP перестает обрабатываться.

2. Я не уверен , как мы определяем, что он использует время события и водяной знак, я новичок в этом …

3. В классе приложений выше , если мы прокомментируем поток данных кафки, раскомментируем и используем его ниже в качестве источника входных данных, он отлично работает: Поток данных<Строка> Входы = env.addSource(новый S3EventSource());

4. Основываясь на водяном знаке, который, хотя и может быть проблемой, я добавил ниже для потребителя Кафки: flinkKafkaConsumer.assignTimestampsAndWatermarks( Стратегия водяных знаков .запрещеннаяустойчивость(Длительность.секунд(20))); но все равно это не сработало …

Ответ №2:

Прежде всего , большое спасибо @Петр Новойский , просто из-за вашего маленького указателя, который, сколько бы я ни размышлял о времени события, мне не приходил в голову. Так что да, при отладке двух случаев:

  1. С помощью внутреннего источника данных , где он успешно обрабатывался, при отладке потока я определил , что он обрабатывает водяной знак после обработки данных, но меня не зацепило, что он каким-то образом неявно управляет временем события данных.
  2. С кафкой в качестве источника данных , пока я отлаживал, я мог очень четко видеть , что он не обрабатывает никаких водяных знаков в потоке, но мне не приходило в голову, что это происходит из-за того, что время события и водяной знак не обрабатываются должным образом.

Просто добавьте одну строку кода в код приложения, который я понял из приведенного ниже фрагмента кода Flink:

 @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
     *     TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
     *     event-time support anymore. Explicitly using processing-time windows and timers works in
     *     event-time mode. If you need to disable watermarks, please use {@link
     *     ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
     *     TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
     *     WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
     *     org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
     *     that change behaviour based on the time characteristic, please use equivalent operations
     *     that explicitly specify processing time or event time.
     */
 

Я узнал, что по умолчанию flink учитывает время события, и для этого водяной знак должен быть обработан должным образом, чего я не сделал, поэтому я добавил ссылку ниже для настройки временных характеристик среды выполнения flink:

env.setStreamTimeCharacteristic(характеристика времени.Время обработки);

и бац … он начал работать , хотя это устарело и нуждается в какой-то другой конфигурации, но большое спасибо , это был отличный указатель, он мне очень помог, и я решил проблему..

Еще раз спасибо @Петр Новойский