У меня есть вариант использования, когда я должен передать некоторое значение в pyspark и в узле workers, мне нужно получить к ним доступ и выполнить некоторые бизнес-логические действия

#pyspark #spark-structured-streaming

#пыспарк #спарк-структурированный-потоковый

Вопрос:

В настоящее время у меня есть вариант использования, когда я создаю накопитель для передачи переменных на рабочий узел в pyspark. accuSum=spark.sparkContext.accumulator(0) Если я использую накопитель с пакетным фреймом данных, как показано ниже

rdd2=spark.sparkContext.parallelize([1,2,3,4,5]) rdd2.foreach(лямбда x:accumCount.add(1)) выведите(accumCount.value)«, я могу заставить тонкости работать. Но я хочу использовать аккумуляторы с потоками kafkainput , чтобы во время потребления кафки я обновлял флаг аккумулятора, и когда флаг достигнет некоторого состояния, я выполню некоторые бизнес-логики

 .writeStream.foreach(lambda x:accuSum.add(1))  .outputMode("update")  .start() print(accuSum.value) if accuSum gt; 10: # perform some business logic```