Использование apache Spark для объединения в списки кортежей

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я пытаюсь объединить с RDD :

 val u1 = sc.parallelize(List ( ("a" , (1,2)) , ("b" , (1,2))))
val u2 = sc.parallelize(List ( ("a" , ("3")) , ("b" , (2))))
 

Я получаю сообщение об ошибке :

  scala> u1 union u2
<console>:17: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Any)]
 required: org.apache.spark.rdd.RDD[(String, (Int, Int))]
Note: (String, Any) >: (String, (Int, Int)), but class RDD is invariant in type
T.
You may wish to define T as -T instead. (SLS 4.5)
              u1 union u2
                       ^
 

Строковый тип в каждом из приведенных выше кортежей является ключом.

Возможно ли объединить эти два типа?

Как только u1 и u2 объединены, я намерен использовать groupBy для группировки каждого элемента в соответствии с его ключом.

Комментарии:

1. Вы действительно имеете в виду ("a" , ("3")) или так и должно быть ("a" , (3)) ? Кроме того, вам действительно нужны кортежи в качестве значений? Учитывая, что Tuple1 и Tuple2 являются разными типами, группа будет иметь супертип Any

2. @maasg «вам действительно нужны кортежи в качестве значений?» Не обязательно, вы предлагаете хранить значения в объекте типа Any?

Ответ №1:

Проблема, с которой вы сталкиваетесь, на самом деле объясняется компилятором: вы пытаетесь объединить значения типа (Int,Int) со значениями типа Any . В Any этом утверждении используется суперкласс String and Int : sc.parallelize(List ( ("a" , ("3")) , ("b" , (2)))) . Это может быть ошибкой или может быть преднамеренным.

В любом случае, я бы попытался привести значения к общему типу перед объединением. Учитывая, что Tuple1, Tuple2 — это разные типы, я бы рассмотрел какой-нибудь другой контейнер, который легче преобразовать.

Предполагая, что "3" приведенное выше на самом деле является 3 ( Int ):

 val au1 = sc.parallelize(List ( ("a" , Array(1,2)) , ("b" , Array(1,2))))
val au2 = sc.parallelize(List ( ("a" , Array(3)) , ("b" , Array(2))))
au1 union au2
org.apache.spark.rdd.RDD[(String, Array[Int])] = UnionRDD[10] at union at <console>:17
res: Array[(String, Array[Int])] = Array((a,Array(1, 2)), (b,Array(1, 2)), (a,Array(3)), (b,Array(2)))
 

Как только u1 и u2 объединены, я намерен использовать groupBy для группировки каждого элемента
в соответствии с его ключом.

Если вы намерены сгруппировать оба rdd по ключу, вы можете рассмотреть возможность использования join вместо union . Это сразу выполняет работу

 au1 join au2
res: Array[(String, (Array[Int], Array[Int]))] = Array((a,(Array(1, 2),Array(3))), (b,(Array(1, 2),Array(2))))
 

Если приведенное "3" выше на самом деле является a "3" ( String ): я бы подумал о том, чтобы сначала сопоставить значения с общим типом. Либо все строки, либо все целые числа. Это упростит обработку данных, чем Any использование типа as. Ваша жизнь станет проще.

Ответ №2:

Если вы хотите использовать RDD (ключ, значение) с любым значением (я вижу, вы пытаетесь и RDD с помощью and (Int, Int), и Int и строка), вы можете определить тип вашего RDD при создании:

 val u1:org.apache.spark.rdd.RDD[(String, Any)] = sc.parallelize(List ( ("a" , (1,2)) , ("b" , (1,2))))
val u2org.apache.spark.rdd.RDD[(String, Any)] = sc.parallelize(List ( ("a" , ("3")) , ("b" , (2))))
 

Тогда объединение будет работать, потому что это объединение между одними и теми же типами.

Надеюсь, это поможет