#dataframe #pyspark #spark-structured-streaming #spark-streaming-kafka #discretization
Вопрос:
У меня есть брокер Кафки с темой, связанной со структурированной потоковой передачей Spark. Моя тема отправляет данные в мой фрейм потоковых данных, и я хотел бы получить информацию по каждой строке для этой темы (потому что мне нужно сравнить каждую строку с другой базой данных).
Если бы я мог преобразовать свои пакеты в RDD, я мог бы легко получить каждую строку.
Я также видел кое-что о DStreams, но я не знаю, работает ли он с последней версией f spark по-прежнему.
Является ли DStream ответом на мою проблему или есть ли какое-либо другое решение для получения моих данных строка за строкой?
Комментарии:
1. Пожалуйста, предоставьте достаточно кода, чтобы другие могли лучше понять или воспроизвести проблему.
2. Вы можете использовать forEachBatch и foreachPartition для получения RDDS из вашего кадра потоковых данных
Ответ №1:
Прочитайте данные в потоковой передаче spark от кафки и запишите свое собственное сравнение строк в программе для записи потоковой передачи spark . например.
streamingDatasetOfString.writeStream.foreach(
новый ForeachWriter[Строка] {
def open(partitionId: Long, version: Long): Boolean = { // Open connection } def process(record: String): Unit = { // Write string to connection } def close(errorOrNull: Throwable): Unit = { // Close the connection }}).start()
` Это поддерживается в python,scala,java начиная с spark 2.4