# #pyspark #google-bigquery #spark-streaming #google-cloud-dataproc #spark-streaming-kafka
Вопрос:
Я пытаюсь записывать данные, извлеченные из Кафки, в таблицу Bigquery каждые 120 секунд. Я хотел бы выполнить некоторые дополнительные операции, которые по документации должны быть возможны внутри метода .foreach()
или foreachBatch()
.
В качестве теста я хотел печатать простое сообщение каждый раз, когда данные извлекаются из кафки и записываются в BigQuery.
batch_job=df_alarmsFromKafka.writeStream .trigger(processingTime='120 seconds') .foreachBatch(print("do i get printed every batch?")) .format("bigquery").outputMode("append") .option("temporaryGcsBucket",path1) .option("checkpointLocation",path2) .option("table", table_kafka) .start() batch_job.awaitTermination()
Я ожидал бы, что это сообщение будет печататься каждые 120 секунд в выходной ячейке лаборатории jupyter, вместо этого оно печатается только один раз и просто продолжает писать в BigQuery.
Если я попытаюсь использовать .foreach()
вместо foreachBatch()
batch_job=df_alarmsFromKafka.writeStream .trigger(processingTime='120 seconds') .foreach(print("do i get printed every batch?")) .format("bigquery").outputMode("append") .option("temporaryGcsBucket",path1) .option("checkpointLocation",path2) .option("table", table_kafka) .start() batch_job.awaitTermination()
он печатает сообщение один раз и сразу после этого выдает следующую ошибку, которую я не смог отладить/понять:
/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f) 1335 1336 if not hasattr(f, 'process'): -gt; 1337 raise Exception("Provided object does not have a 'process' method") 1338 1339 if not callable(getattr(f, 'process')): Exception: Provided object does not have a 'process' method
Я делаю что-то не так? как я могу просто выполнять некоторые операции каждые 120 секунд, кроме тех, которые выполняются непосредственно над вычисляемым df_alarmsFromKafka
фреймом данных ?
Ответ №1:
Дополнительные операции разрешены, но только с выходными данными потокового запроса. Но здесь вы пытаетесь напечатать некоторые строки, которые не связаны с самими выходными данными. Его можно распечатать только один раз.
Например, если вы напишете функцию foreachbatch, как показано ниже:
def write_to_cassandra(target_df, batch_id): target_df.write .format("org.apache.spark.sql.cassandra") .option("keyspace", "tweet_db") .option("table", "tweet2") .mode("append") .save() target_df.show()
Он будет выводить target_df в каждом пакете, так как функция .show() связана с самими выходными данными.
По вашему второму вопросу:
Функция Foreach ожидает от вас расширения класса ForeachWriter путем реализации методов open, process и close, которых у вас там не было.