Расположение контрольных точек структурированной потоковой передачи в spark 3.2.0 против 2.4.8

#apache-spark #pyspark #spark-structured-streaming #spark-kafka-integration

Вопрос:

У меня есть скрипт pyspark, работающий на сборке spark 2.4.8 hadoop 2.6.

Сценарий-это простая структурированная потоковая передача с кафкой. Прочитайте из темы, извлеките, отфильтруйте, сопоставьте и напишите в другую тему кафки. Сокращение для тестов выглядит так.

 from pyspark.sql.types import StructType,StructField,StringType from pyspark.sql.functions import from_json,col,expr,to_json,struct,trim,explode  spark = SparkSession   .builder   .appName("myapp")   .getOrCreate()  df_schema = StructType([  StructField("field_one", StructType([  StructField("inner_one", StringType(), True),  StructField("inner_two", StringType(), True)  ]), True),  StructField("field_two", StringType(), True) ])  log = spark   .readStream   .format("kafka")   .option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS")   .option("subscribe", "MY_TOPIC")   .load()   .selectExpr("CAST(value AS STRING) as val")   .select(from_json("val",df_schema).alias("all"))   .select(col("all.*"))  extract = log   .filter("field_one.inner_one = 'some_value'")   .select(to_json(struct("field_one","field_two")).alias("value"))   result = extract.writeStream   .format("kafka")   .option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS")   .option("topic", "MY_TOPIC_TWO")   .outputMode("append")   .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test")   .trigger(processingTime='5 seconds')   .start()  result.awaitTermination()  

Работает нормально.

Недавно я установил новую установку spark. Sprark 3.2.0 без дистрибутива hadoop Пакеты hadoop 2.6 добавлены в путь к классам spark через SPARK_DIST_CLASSPATH.

Я подключаюсь к тому же hadoop.

Вот в чем проблема. Искра выдает исключение, связанное с местоположением контрольной точки.

Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/user/me/spark/checkpoints/spark-test

Самое странное, что когда я удаляю протокол .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") , чтобы .option("checkpointLocation", "/user/me/spark/checkpoints/spark-test")

Скрипт запускается, и даже он указывает на hdfs в указанном месте.

Мой вопрос в том, есть ли какие-либо изменения в контрольных точках в структурированной потоковой передаче между spark 2.4.x и 3.2.x ? Если да, то как я могу предоставить другую файловую систему для целей проверки ? Или это просто вопрос моей пользовательской установки spark — spark 3 с пакетами hadoop 2.6, предоставляемыми в classpath ?