#pyspark #spark-structured-streaming
#пыспарк #спарк-структурированный-потоковый
Вопрос:
В настоящее время у меня есть вариант использования, когда я создаю накопитель для передачи переменных на рабочий узел в pyspark. accuSum=spark.sparkContext.accumulator(0)
Если я использую накопитель с пакетным фреймом данных, как показано ниже
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(лямбда x:accumCount.add(1)) выведите(accumCount.value)«, я могу заставить тонкости работать. Но я хочу использовать аккумуляторы с потоками kafkainput , чтобы во время потребления кафки я обновлял флаг аккумулятора, и когда флаг достигнет некоторого состояния, я выполню некоторые бизнес-логики
.writeStream.foreach(lambda x:accuSum.add(1)) .outputMode("update") .start() print(accuSum.value) if accuSum gt; 10: # perform some business logic```