Структурированная потоковая передача Apache Spark — не запись в местоположение контрольной точки

#apache-spark #apache-kafka #spark-structured-streaming #spark-checkpoint

#apache-искра #апач-кафка #спарк-структурированный-потоковый #искра-контрольная точка

Вопрос:

у меня есть простой структурированный потоковый код python Apache Spark, который считывает данные из Кафки и записывает сообщения в консоль.

я настроил местоположение контрольной точки, однако код не записывается в контрольную точку.. есть идеи, почему ?

Вот код:

 from pyspark.sql import SparkSession, Window   spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate() # os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2" # os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'  # kafkaBrokers='localhost:9092' kafkaBrokers='lt;hostgt;:lt;portgt;' topic = "my-topic" # bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093 security_protocol="SSL" ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/ca.p12" ssl_truststore_password="lt;pwd_1gt;" ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/user.p12" ssl_keystore_password="lt;pwd_2gt;" consumerGroupId = "my-group"  spark.sparkContext.setLogLevel("ERROR")  df = spark.read.format('kafka')  .option("kafka.bootstrap.servers",kafkaBrokers)  .option("kafka.security.protocol","SSL")   .option("kafka.ssl.truststore.location",ssl_truststore_location)   .option("kafka.ssl.truststore.password",ssl_truststore_password)   .option("kafka.ssl.keystore.location", ssl_keystore_location)  .option("kafka.ssl.keystore.password", ssl_keystore_password)  .option("subscribe", topic)   .option("kafka.group.id", consumerGroupId)  .option("startingOffsets", "earliest")   .load()  query = df.selectExpr("CAST(value AS STRING)")   .write   .format("console")   .option("numRows",100)  .option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/")   .option("outputMode", "complete")  .save("output")