метод foreach() с ошибками потоковой передачи Spark

# #pyspark #google-bigquery #spark-streaming #google-cloud-dataproc #spark-streaming-kafka

Вопрос:

Я пытаюсь записывать данные, извлеченные из Кафки, в таблицу Bigquery каждые 120 секунд. Я хотел бы выполнить некоторые дополнительные операции, которые по документации должны быть возможны внутри метода .foreach() или foreachBatch() .

В качестве теста я хотел печатать простое сообщение каждый раз, когда данные извлекаются из кафки и записываются в BigQuery.

 batch_job=df_alarmsFromKafka.writeStream .trigger(processingTime='120 seconds')  .foreachBatch(print("do i get printed every batch?")) .format("bigquery").outputMode("append")  .option("temporaryGcsBucket",path1)  .option("checkpointLocation",path2)  .option("table", table_kafka)  .start() batch_job.awaitTermination()  

Я ожидал бы, что это сообщение будет печататься каждые 120 секунд в выходной ячейке лаборатории jupyter, вместо этого оно печатается только один раз и просто продолжает писать в BigQuery.

Если я попытаюсь использовать .foreach() вместо foreachBatch()

 batch_job=df_alarmsFromKafka.writeStream .trigger(processingTime='120 seconds')  .foreach(print("do i get printed every batch?")) .format("bigquery").outputMode("append")  .option("temporaryGcsBucket",path1)  .option("checkpointLocation",path2)  .option("table", table_kafka)  .start() batch_job.awaitTermination()  

он печатает сообщение один раз и сразу после этого выдает следующую ошибку, которую я не смог отладить/понять:

 /usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)  1335   1336 if not hasattr(f, 'process'): -gt; 1337 raise Exception("Provided object does not have a 'process' method")  1338   1339 if not callable(getattr(f, 'process')):  Exception: Provided object does not have a 'process' method  

Я делаю что-то не так? как я могу просто выполнять некоторые операции каждые 120 секунд, кроме тех, которые выполняются непосредственно над вычисляемым df_alarmsFromKafka фреймом данных ?

Ответ №1:

Дополнительные операции разрешены, но только с выходными данными потокового запроса. Но здесь вы пытаетесь напечатать некоторые строки, которые не связаны с самими выходными данными. Его можно распечатать только один раз.

Например, если вы напишете функцию foreachbatch, как показано ниже:

 def write_to_cassandra(target_df, batch_id): target_df.write   .format("org.apache.spark.sql.cassandra")   .option("keyspace", "tweet_db")   .option("table", "tweet2")   .mode("append")   .save() target_df.show()  

Он будет выводить target_df в каждом пакете, так как функция .show() связана с самими выходными данными.

По вашему второму вопросу:

Функция Foreach ожидает от вас расширения класса ForeachWriter путем реализации методов open, process и close, которых у вас там не было.