Apache spark streaming — кэш набора данных для объединения

#streaming #apache-spark

#потоковая передача #apache-spark

Вопрос:

Я рассматриваю возможность использования Apache Spark streaming для некоторой работы в режиме реального времени, но я не уверен, как кэшировать набор данных для использования при объединении / поиске.

Основными входными данными будут записи json, поступающие из Kafka, которые содержат идентификатор, я хочу преобразовать этот идентификатор в имя, используя набор данных для поиска. Набор данных поиска находится в Mongo Db, но я хочу иметь возможность кэшировать его внутри процесса spark, поскольку набор данных меняется очень редко (раз в пару часов), поэтому я не хочу нажимать на mongo для каждой входной записи или перезагружать все записи в каждом пакете spark, но мне нужно периодически обновлять данные, хранящиеся в spark (например, каждые 2 часа).

Каков наилучший способ сделать это?

Спасибо.

Ответ №1:

Я долго и упорно думал об этом сам. В частности, я задавался вопросом, возможно ли на самом деле реализовать базу данных DB в своего рода Spark.

Ну, ответ вроде как да. Сначала вам нужна программа, которая сначала кэширует основной набор данных в память, затем каждые пару часов выполняет оптимизированное объединение с tiny для обновления основного набора данных. Теперь, по-видимому, у Spark будет метод, который выполняет объединение с помощью tiny (возможно, он уже выпущен в 1.0.0 — мой стек застрял на 0.9.0, пока не выйдет CDH 5.1.0).

В любом случае, вы можете вручную реализовать объединение с помощью tiny, взяв периодический двухчасовой набор данных и превратив его в HashMap, а затем передав его как широковещательную переменную. Это означает, что хэш-карта будет скопирована, но только один раз для каждого узла (сравните это с простой ссылкой на карту — она будет скопирована один раз для каждой задачи — гораздо большая стоимость). Затем вы берете свой основной набор данных и добавляете новые записи, используя широковещательную карту. Затем вы можете периодически (каждую ночь) сохранять в hdfs или что-то в этом роде.

Итак, вот несколько неаккуратных псевдокодов для пояснения:

 var mainDataSet: RDD[KeyType, DataType] = sc.textFile("/path/to/main/dataset")
  .map(parseJsonAndGetTheKey).cache()

everyTwoHoursDo {
  val newData: Map[KeyType, DataType] = sc.textFile("/path/to/last/two/hours")
    .map(parseJsonAndGetTheKey).toarray().toMap

  broadcast(newData)

  val mainDataSetNew = 
    mainDataSet.map((key, oldValue) => (key, 
      newData.get(key).map(newDataValue => 
        update(oldValue, newDataValue))
      .getOrElse(oldValue)))
    .cache()

  mainDataSetNew.someAction() // to force execution

  mainDataSet.unpersist()
  mainDataSet = mainDataSetNew
}
  

Я также подумал, что вы могли бы быть очень умными и использовать пользовательский разделитель с вашим собственным пользовательским индексом, а затем использовать пользовательский способ обновления разделов, чтобы каждый раздел сам содержал вложенную карту. Затем вы можете пропустить обновление разделов, которые, как вы знаете, не будут содержать ключей, встречающихся в новых данных, а также оптимизировать процесс обновления.

Лично я считаю, что это действительно классная идея, и самое приятное, что ваш набор данных уже готов в памяти для некоторого анализа / машинного обучения. Недостатком является то, что вы немного изобретаете колесо. Возможно, было бы лучше рассмотреть возможность использования Cassandra, поскольку Datastax сотрудничает с Databricks (людьми, которые создают Spark) и может в конечном итоге поддерживать что-то вроде этого из коробки.

Дальнейшее чтение:

http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

http://www.datastax.com/2014/06/datastax-unveils-dse-45-the-future-of-the-distributed-database-management-system

Комментарии:

1. Как насчет использования накопительной хэш-карты, получение локального значения накопителя может быть полезным, особенно если похожие данные попадают в одни и те же разделы

2. Спасибо — я попробую.

Ответ №2:

Вот довольно простой рабочий процесс:

Для каждого пакета данных:

  1. Преобразуйте пакет данных JSON в DataFrame (b_df).
  2. Прочитайте набор данных поиска из MongoDB как фрейм данных (m_df). Затем кэш, m_df.cache()
  3. Объедините данные с помощью b_df.join(m_df, «join_field»)
  4. Выполните требуемую агрегацию, а затем выполните запись в источник данных.