#apache-spark #pyspark #key-value
#apache-spark #pyspark #ключ-значение
Вопрос:
Итак, у меня есть RDD, который мне нужно превратить в словарь. Однако я получаю несколько ошибок, и я застрял.
Первое, что я делаю, это загружаю свой csv-файл:
dataset = spark.read.csv('/user/myuser/testing_directory/output_csv', inferSchema = True, header = True)
Затем я собираю данные в RDD:
pre_experian_rdd = dataset.collect()
Итак, мои данные выглядят так:
Row(name='BETTING Golf Course', address='1234 main st', city_name='GARDEN HOUSE', state='OH', zipcode=45209)
Мне нужно сохранить ту же структуру с ключом: значение для всей строки, потому что мне нужно выполнить вызов api. Таким образом, это должно быть: `{имя: значение, адрес: значение, имя_города: значение, состояние: значение, почтовый индекс: значение}
Но когда я выполняю collectAsMap(), я получаю следующую ошибку:
dictionary update sequence element #0 has length 5; 2 is required
Мне нужны заголовки, чтобы представлять ключ: значение
Может кто-нибудь дать некоторое представление о том, что я делаю неправильно, пожалуйста?
Вот фрагмент моего кода:
dataset = spark.SparkContext.textFile(‘/user/myuser/testing_directory/output_csv’)
сообщение об ошибке pre_experian_rdd = dataset.collectAsMap():
An error was encountered:
dictionary update sequence element #0 has length 36; 2 is required
Traceback (most recent call last):
File "/app/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/pyspark/rdd.py", line 1587, in collectAsMap
return dict(self.collect())
ValueError: dictionary update sequence element #0 has length 36; 2 is required
Когда я загружаю его как CSV, мне нужно выполнить несколько преобразований:
dataset = spark.read.csv(‘/user/myuser/testing_directory/output_csv’, inferSchema = True, заголовок = True)
ex_rdd = dataset.collect()
Итак, мой rdd выглядит примерно так:
[Row(name='BEAR LAKE GOLF COURSE amp; RESORT', address='PO BOX 331', city_name='GARDEN CITY', state='UT', zipcode=84028), Row(name='CHRISTENSEN amp; PETERSON, INC.', address='39 N MAIN ST', city_name='RICHFIELD', state='UT', zipcode=84701), Row(name='ALEXANDERS PRECISION MACHINING', address='15731 CHEMICAL LANE', city_name='HUNTINGTON BEACH', state='CA', zipcode=92649), Row(name='JOSEPH amp; JANET COLOMBO', address='1003 W COLLEGE', city_name='BOZEMAN', state='MT', zipcode=59715)
Если я это сделаю ex_rdd.collectAsMap()
, я получу следующую ошибку
dictionary update sequence element #0 has length 5; 2 is required
Чтобы обойти это, я должен сделать следующее:
df_dict = [row.asDict() for row in dataset.collect()]
[{'name': 'BEAR RESORT', 'address': 'POP 331', 'city_name': 'GARDEN LAKE', 'state': 'UT', 'zipcode': 12345}, {'name': 'CHRISTENSEN INC.', 'address': '12345 MAIN AVE', 'city_name': 'FAIRFIELD', 'state': 'UT', 'zipcode': 12345}, {'name': 'PRECISE MARCHING', 'address': '1234 TESTING LANE', 'city_name': 'HUNTINGTON BEACH', 'state': 'CA', 'zipcode': 92649}]
Проблема в том, что это все еще список, и мне нужен словарь.
Комментарии:
1. Когда вы выполняете сбор, вы получите список объектов строк, а не rdd. вы можете попробовать dataset.rdd.collectAsMap() для вашего сценария
2. @HArdRese7 когда я выполняю collectAsMap(), именно тогда «элемент последовательности обновления словаря # 0 имеет длину 5; требуется 2»
3. Предоставьте свой код, чтобы мы могли видеть инструкцию метода collectAsMap, которую вы используете. посмотрите на методы flatMap и foldBykey rdd и посмотрите, работает ли это. Пожалуйста, предоставьте примеры данных и кода, чтобы ошибка могла быть воспроизведена для поиска решения.
4. @HArdRese7 Я обновил проблему с помощью своего кода. Я надеюсь, что это прояснит проблему.