#apache-spark #pyspark #spark-structured-streaming #spark-kafka-integration
Вопрос:
У меня есть скрипт pyspark, работающий на сборке spark 2.4.8 hadoop 2.6.
Сценарий-это простая структурированная потоковая передача с кафкой. Прочитайте из темы, извлеките, отфильтруйте, сопоставьте и напишите в другую тему кафки. Сокращение для тестов выглядит так.
from pyspark.sql.types import StructType,StructField,StringType from pyspark.sql.functions import from_json,col,expr,to_json,struct,trim,explode spark = SparkSession .builder .appName("myapp") .getOrCreate() df_schema = StructType([ StructField("field_one", StructType([ StructField("inner_one", StringType(), True), StructField("inner_two", StringType(), True) ]), True), StructField("field_two", StringType(), True) ]) log = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS") .option("subscribe", "MY_TOPIC") .load() .selectExpr("CAST(value AS STRING) as val") .select(from_json("val",df_schema).alias("all")) .select(col("all.*")) extract = log .filter("field_one.inner_one = 'some_value'") .select(to_json(struct("field_one","field_two")).alias("value")) result = extract.writeStream .format("kafka") .option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS") .option("topic", "MY_TOPIC_TWO") .outputMode("append") .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") .trigger(processingTime='5 seconds') .start() result.awaitTermination()
Работает нормально.
Недавно я установил новую установку spark. Sprark 3.2.0 без дистрибутива hadoop Пакеты hadoop 2.6 добавлены в путь к классам spark через SPARK_DIST_CLASSPATH.
Я подключаюсь к тому же hadoop.
Вот в чем проблема. Искра выдает исключение, связанное с местоположением контрольной точки.
Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/user/me/spark/checkpoints/spark-test
Самое странное, что когда я удаляю протокол .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test")
, чтобы .option("checkpointLocation", "/user/me/spark/checkpoints/spark-test")
Скрипт запускается, и даже он указывает на hdfs в указанном месте.
Мой вопрос в том, есть ли какие-либо изменения в контрольных точках в структурированной потоковой передаче между spark 2.4.x и 3.2.x ? Если да, то как я могу предоставить другую файловую систему для целей проверки ? Или это просто вопрос моей пользовательской установки spark — spark 3 с пакетами hadoop 2.6, предоставляемыми в classpath ?