#java #apache-spark #spark-graphx
Вопрос:
Я пытаюсь использовать набор данных дорожной сети Сан-Франциско, который находится в свободном доступе по этой ссылке:
cs.utah.edu/~lifeifei/SpatialDataset.htm
Я использую только набор данных Edge и хочу разбить карту дорожной сети на кластер из 3 машин (на данный момент). Однако, когда я пытаюсь применить функцию «graph.partitionBy(PartitonStrategy)» к graphRDD, я получаю ошибку «Стратегия разделения не может быть решена или не является полем». Кто-нибудь может подсказать, почему я получаю ошибку? Я помещу код ниже:
SparkConf conf = new SparkConf().setMaster("local").setAppName("graph");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
ClassTag<Integer> intTag = scala.reflect.ClassTag$.MODULE$.apply(Integer.class);
ClassTag<Double> doubleTag = scala.reflect.ClassTag$.MODULE$.apply(Double.class);
List<Edge<Double>> edges = new ArrayList<>();
String inputFileName = "Dataset/SFEdge.txt";
// Edge datset contains edgeId|SourceId|DestinationId|EdgeLength
// edges.add(new Edge<Double>(1, 2, 3.5));
// edges.add(new Edge<Double>(2, 3, 4.8));
readTextEdgeFile(edges, inputFileName);
JavaRDD<Edge<Double>> edgeRDD = javaSparkContext.parallelize(edges);
Graph<String, Double> graph = Graph.fromEdges(edgeRDD.rdd(), "", StorageLevel.MEMORY_ONLY(),
StorageLevel.MEMORY_ONLY(), stringTag, doubleTag);
graph.edges().toJavaRDD().foreach(x -> System.out
.println("Source: " x.srcId() " , Destination: " x.dstId() ", Distance: " x.attr$mcD$sp()));
//Error is generated here below this comment
graph.partitionBy(PartitionStrategy.RandomVertexCut$.MODULE$);
}
public static boolean readTextEdgeFile(List<Edge<Double>> edgeList, String txtFileName)
throws FileNotFoundException, IOException {
String line = "";
String txtSplitBy = " ";
boolean removedBOM = false;
try (BufferedReader br = new BufferedReader(new FileReader(txtFileName))) {
while ((line = br.readLine()) != null) {
String[] record = line.split(txtSplitBy);
if (record.length == 4) {
if (!removedBOM amp;amp; record[0] != "0") {
record[0] = String.valueOf(0);
removedBOM = true;
}
edgeList.add(new Edge<Double>(Integer.parseInt(record[1]), Integer.parseInt(record[2]),
Double.parseDouble(record[3])));
}
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
Ответ №1:
Проблема заключалась в версии зависимости. Я думаю, что более новая версия GraphX не смогла найти стратегию разделения. Поэтому я изменил свои зависимости на более низкую версию, и это сработало. `
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>2.4.8</version>
</dependency>
»
Кстати, есть ли какой-нибудь способ узнать, что график был разбит на разделы? Своего рода подтверждение?