#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
Я наткнулся на странную ситуацию. Я пытаюсь выполнить это:
DataStream<Integer> numStream = env.fromElements(1,2,3,4,5,6,7,8).name("numSource");
DataStream<Integer> controlStream = env.fromElements(3,23,14,53,65,77,85,102).name("conSource");
JoinedStreams.WithWindow<Integer, Integer, Integer, TimeWindow> joinedStreams =
numStream
.join(controlStream)
.where(numS -> numS, TypeInformation.of(new TypeHint<Integer>() {}))
.equalTo(controlS -> controlS, TypeInformation.of(new TypeHint<Integer>() {}))
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(300)));
joinedStreams.apply(new JoinFunction<Integer, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> join(Integer integer, Integer integer2) throws Exception {
return Tuple2.of(integer, integer2);
}
}).print();
env.execute("Sandbox Job");
В ответ я получаю эту ошибку обратно:
16:59:39,305 WARN org.apache.flink.metrics.MetricGroup [] -
The operator name Window(TumblingProcessingTimeWindows(300), ProcessingTimeTrigger, CoGroupWindowFunction)
exceeded the 80 characters length limit and was truncated.
Я тщетно пытался назвать преобразование. Я пытался изменить размер метрик, но нигде не смог найти его, чтобы изменить его.
Любая помощь была бы очень ценной, это выглядит очень простой задачей, и все же я не могу ее запустить.
Я добавляю pom.xml файл свойств и зависимостей приведен ниже. Заранее благодарю вас.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>gr.tuc.psavvaidis</groupId>
<artifactId>pli613_lab</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<kafka.version>2.7.0</kafka.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>compile</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>compile</scope>-->
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
Комментарии:
1. Вы получаете предупреждение, а не ошибку, о длине имени. Почему вы говорите «я не могу его запустить»?
2. Эй, @kkrugler, когда я хвалю его, он запускается и печатается, как и ожидалось. но когда я присоединяюсь и открываю его, я получаю предупреждение и не получаю никаких результатов. Flink обычно выдает мне ошибки о времени выполнения, не назначенных приемниках, типах, не реализованных интерфейсах, программа все еще может запускаться, но она отменяет задание
3. Когда вы добавляете функцию управления окнами, ничего не выдается до тех пор, пока не сработает триггер окна (300 мс, в вашем случае). Но прежде чем это произойдет, ваши два источника будут завершены, и рабочий процесс остановится.