Фильтровать rdd в зависимости от значений второго rdd

#python #apache-spark #pyspark #rdd

#python #apache-spark #pyspark #rdd

Вопрос:

У меня есть два rdd, и я хотел бы отфильтровать один по значению другого.

Несколько экземпляров каждого rdd следующие:

 rdd1 = [((address1, date1),1), ((address5, date2),1), ((address1, date2),1), ((address2,date3),1)]
rdd2 = [(address1,1), (address1,1), (address2, 1), (address1, 1)]
 

Желаемый результат будет:

 joined_rdd = [((address1, date1),1),((address1, date2),1),((address2,date3),1)]
 

Итак, в основном я хочу сохранить кортежи в rdd1, если значение адреса в этом кортеже существует в rdd2.

Ответ №1:

Выполните объединение и удалите все из rdd2:

 rdd1 = sc.parallelize([(('address1', 'date1'),1), (('address5', 'date2'),1), (('address1', 'date2'),1), (('address2','date3'),1)])
rdd2 = sc.parallelize([('address1',1), ('address1',1), ('address2', 1), ('address1', 1)])

result_rdd = (rdd1.keyBy(lambda x: x[0][0])
                  .join(rdd2.map(lambda x: x[0])
                            .keyBy(lambda x: x)
                            .distinct())
                  .map(lambda x: x[1][0]))

result_rdd.collect()
[(('address2', 'date3'), 1), (('address1', 'date1'), 1), (('address1', 'date2'), 1)]