#python #apache-spark
#python #apache-spark
Вопрос:
Я пытаюсь решить следующую проблему с помощью pyspark. У меня есть файл в hdfs в формате, который представляет собой дамп таблицы поиска.
key1, value1
key2, value2
...
Я хочу загрузить это в словарь python в pyspark и использовать его для какой-то другой цели. Итак, я попытался сделать:
table = {}
def populateDict(line):
(k,v) = line.split(",", 1)
table[k] = v
kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)
Я обнаружил, что переменная таблицы не изменена. Итак, есть ли способ создать большую хеш-таблицу inmemory в spark?
Ответ №1:
foreach
это распределенное вычисление, поэтому вы не можете ожидать, что оно изменит структуру данных, видимую только в драйвере. То, чего ты хочешь, это.
kv.map(line => { line.split(" ") match {
case Array(k,v) => (k,v)
case _ => ("","")
}.collectAsMap()
Это в scala, но вы поняли идею, важна функция, collectAsMap()
которая возвращает карту драйверу.
Если ваши данные очень большие, вы можете использовать PairRDD в качестве карты. Первая карта для пар
kv.map(line => { line.split(" ") match {
case Array(k,v) => (k,v)
case _ => ("","")
}
затем вы можете получить доступ с помощью rdd.lookup("key")
, которая возвращает последовательность значений, связанных с ключом, хотя это определенно будет не так эффективно, как другие распределенные хранилища KV, поскольку spark на самом деле не создан для этого.
Комментарии:
1. Классное спасибо. Означает ли это, что карта должна помещаться в память драйвера? Или он все еще распространяется?
2. @Kamal да, он должен поместиться в mem. Вы могли бы использовать pair rdd в качестве таблицы поиска. Также подумал о решении с помощью accumulable, скоро опубликую
3. Хорошо. Я искал распределенную карту в spark. Похоже, это невозможно!
4. Спасибо! Я попробую
5. разве вам не хватает }?
Ответ №2:
Для повышения эффективности смотрите: sortByKey() и lookup()
поиск (ключ):
Верните список значений в RDD для ключа key. Эта операция выполняется эффективно, если у RDD есть известный разделитель, путем поиска только в разделе, которому соответствует ключ.
RDD будет повторно разделен с помощью sortByKey() (см.: OrderedRDD) и будет выполняться эффективный поиск во время lookup()
вызовов. В коде, что-то вроде,
kvfile = sc.textFile("pathtofile")
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey()
sorted_kv.lookup('key1').take(10)
справится с задачей как RDD, так и эффективно.