Как мне превратить RDD в словарь в pyspark?

#apache-spark #pyspark #key-value

#apache-spark #pyspark #ключ-значение

Вопрос:

Итак, у меня есть RDD, который мне нужно превратить в словарь. Однако я получаю несколько ошибок, и я застрял.

Первое, что я делаю, это загружаю свой csv-файл:

 dataset = spark.read.csv('/user/myuser/testing_directory/output_csv', inferSchema = True, header = True)
  

Затем я собираю данные в RDD:

 pre_experian_rdd = dataset.collect()
  

Итак, мои данные выглядят так:

 Row(name='BETTING Golf Course', address='1234 main st', city_name='GARDEN HOUSE', state='OH', zipcode=45209)
  

Мне нужно сохранить ту же структуру с ключом: значение для всей строки, потому что мне нужно выполнить вызов api. Таким образом, это должно быть: `{имя: значение, адрес: значение, имя_города: значение, состояние: значение, почтовый индекс: значение}

Но когда я выполняю collectAsMap(), я получаю следующую ошибку:

 dictionary update sequence element #0 has length 5; 2 is required
  

Мне нужны заголовки, чтобы представлять ключ: значение

Может кто-нибудь дать некоторое представление о том, что я делаю неправильно, пожалуйста?

Вот фрагмент моего кода:

dataset = spark.SparkContext.textFile(‘/user/myuser/testing_directory/output_csv’)

сообщение об ошибке pre_experian_rdd = dataset.collectAsMap():

 An error was encountered:
dictionary update sequence element #0 has length 36; 2 is required
Traceback (most recent call last):
  File "/app/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/pyspark/rdd.py", line 1587, in collectAsMap
    return dict(self.collect())
ValueError: dictionary update sequence element #0 has length 36; 2 is required
  

Когда я загружаю его как CSV, мне нужно выполнить несколько преобразований:

dataset = spark.read.csv(‘/user/myuser/testing_directory/output_csv’, inferSchema = True, заголовок = True)

 ex_rdd = dataset.collect()
  

Итак, мой rdd выглядит примерно так:

 [Row(name='BEAR LAKE GOLF COURSE amp; RESORT', address='PO BOX 331', city_name='GARDEN CITY', state='UT', zipcode=84028), Row(name='CHRISTENSEN amp; PETERSON, INC.', address='39 N MAIN ST', city_name='RICHFIELD', state='UT', zipcode=84701), Row(name='ALEXANDERS PRECISION MACHINING', address='15731 CHEMICAL LANE', city_name='HUNTINGTON BEACH', state='CA', zipcode=92649), Row(name='JOSEPH amp; JANET COLOMBO', address='1003 W COLLEGE', city_name='BOZEMAN', state='MT', zipcode=59715)
  

Если я это сделаю ex_rdd.collectAsMap() , я получу следующую ошибку

 dictionary update sequence element #0 has length 5; 2 is required
  

Чтобы обойти это, я должен сделать следующее:

 df_dict = [row.asDict() for row in dataset.collect()]
[{'name': 'BEAR  RESORT', 'address': 'POP 331', 'city_name': 'GARDEN LAKE', 'state': 'UT', 'zipcode': 12345}, {'name': 'CHRISTENSEN INC.', 'address': '12345 MAIN AVE', 'city_name': 'FAIRFIELD', 'state': 'UT', 'zipcode': 12345}, {'name': 'PRECISE MARCHING', 'address': '1234 TESTING LANE', 'city_name': 'HUNTINGTON BEACH', 'state': 'CA', 'zipcode': 92649}]
  

Проблема в том, что это все еще список, и мне нужен словарь.

Комментарии:

1. Когда вы выполняете сбор, вы получите список объектов строк, а не rdd. вы можете попробовать dataset.rdd.collectAsMap() для вашего сценария

2. @HArdRese7 когда я выполняю collectAsMap(), именно тогда «элемент последовательности обновления словаря # 0 имеет длину 5; требуется 2»

3. Предоставьте свой код, чтобы мы могли видеть инструкцию метода collectAsMap, которую вы используете. посмотрите на методы flatMap и foldBykey rdd и посмотрите, работает ли это. Пожалуйста, предоставьте примеры данных и кода, чтобы ошибка могла быть воспроизведена для поиска решения.

4. @HArdRese7 Я обновил проблему с помощью своего кода. Я надеюсь, что это прояснит проблему.