Создание большого словаря в pyspark

#python #apache-spark

#python #apache-spark

Вопрос:

Я пытаюсь решить следующую проблему с помощью pyspark. У меня есть файл в hdfs в формате, который представляет собой дамп таблицы поиска.

 key1, value1
key2, value2
...
  

Я хочу загрузить это в словарь python в pyspark и использовать его для какой-то другой цели. Итак, я попытался сделать:

 table = {}
def populateDict(line):
    (k,v) = line.split(",", 1)
    table[k] = v

kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)
  

Я обнаружил, что переменная таблицы не изменена. Итак, есть ли способ создать большую хеш-таблицу inmemory в spark?

Ответ №1:

foreach это распределенное вычисление, поэтому вы не можете ожидать, что оно изменит структуру данных, видимую только в драйвере. То, чего ты хочешь, это.

 kv.map(line => { line.split(" ") match { 
    case Array(k,v) => (k,v)
    case _ => ("","")
}.collectAsMap()
  

Это в scala, но вы поняли идею, важна функция, collectAsMap() которая возвращает карту драйверу.

Если ваши данные очень большие, вы можете использовать PairRDD в качестве карты. Первая карта для пар

     kv.map(line => { line.split(" ") match { 
        case Array(k,v) => (k,v)
        case _ => ("","")
    }
  

затем вы можете получить доступ с помощью rdd.lookup("key") , которая возвращает последовательность значений, связанных с ключом, хотя это определенно будет не так эффективно, как другие распределенные хранилища KV, поскольку spark на самом деле не создан для этого.

Комментарии:

1. Классное спасибо. Означает ли это, что карта должна помещаться в память драйвера? Или он все еще распространяется?

2. @Kamal да, он должен поместиться в mem. Вы могли бы использовать pair rdd в качестве таблицы поиска. Также подумал о решении с помощью accumulable, скоро опубликую

3. Хорошо. Я искал распределенную карту в spark. Похоже, это невозможно!

4. Спасибо! Я попробую

5. разве вам не хватает }?

Ответ №2:

Для повышения эффективности смотрите: sortByKey() и lookup()

поиск (ключ):

Верните список значений в RDD для ключа key. Эта операция выполняется эффективно, если у RDD есть известный разделитель, путем поиска только в разделе, которому соответствует ключ.

RDD будет повторно разделен с помощью sortByKey() (см.: OrderedRDD) и будет выполняться эффективный поиск во время lookup() вызовов. В коде, что-то вроде,

 kvfile = sc.textFile("pathtofile")
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey()

sorted_kv.lookup('key1').take(10)
  

справится с задачей как RDD, так и эффективно.