#apache-spark #apache-kafka #spark-structured-streaming #spark-checkpoint
#apache-искра #апач-кафка #спарк-структурированный-потоковый #искра-контрольная точка
Вопрос:
у меня есть простой структурированный потоковый код python Apache Spark, который считывает данные из Кафки и записывает сообщения в консоль.
я настроил местоположение контрольной точки, однако код не записывается в контрольную точку.. есть идеи, почему ?
Вот код:
from pyspark.sql import SparkSession, Window spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate() # os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2" # os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0' # kafkaBrokers='localhost:9092' kafkaBrokers='lt;hostgt;:lt;portgt;' topic = "my-topic" # bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093 security_protocol="SSL" ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/ca.p12" ssl_truststore_password="lt;pwd_1gt;" ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/user.p12" ssl_keystore_password="lt;pwd_2gt;" consumerGroupId = "my-group" spark.sparkContext.setLogLevel("ERROR") df = spark.read.format('kafka') .option("kafka.bootstrap.servers",kafkaBrokers) .option("kafka.security.protocol","SSL") .option("kafka.ssl.truststore.location",ssl_truststore_location) .option("kafka.ssl.truststore.password",ssl_truststore_password) .option("kafka.ssl.keystore.location", ssl_keystore_location) .option("kafka.ssl.keystore.password", ssl_keystore_password) .option("subscribe", topic) .option("kafka.group.id", consumerGroupId) .option("startingOffsets", "earliest") .load() query = df.selectExpr("CAST(value AS STRING)") .write .format("console") .option("numRows",100) .option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/") .option("outputMode", "complete") .save("output")