#apache-spark #pyspark #cassandra #spark-cassandra-connector
Вопрос:
У меня есть таблица Кассандры, которая довольно огромна, и прямо сейчас у меня есть соединение spark-Cassandra со следующим кодом.
import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
table_df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(table='movies', keyspace='movie_lens')
.load()
Первичным ключом является Movie_id, который является целым числом.
функция .load() загружает всю таблицу в память, этого я хочу избежать. Один из способов, который у меня есть, — это использовать фильтр
table_df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(table='movies', keyspace='movie_lens')
.load()
.filter("movie_id = 37032")
Но действительно ли фильтр предотвращает загрузку всей таблицы в память? или он сначала загружается, а затем фильтруется.
Кроме того, мне нужно запросить много идентификаторов. допустим, мне нужно 1000 идентификаторов, и каждый день идентификаторы продолжают меняться . Тогда как это сделать?
Ответ №1:
Да, Spark Cassandra Connector выполнит так называемый «вывод предикатов», если вы выполняете запрос по ключу раздела, и загрузит данные только из определенного запроса ( .load
функция просто загрузит метаданные, фактическая загрузка данных произойдет в первый раз, когда вам действительно нужны данные для выполнения действия). Существуют хорошо документированные правила о том, когда в соединителе Spark Cassandra происходит нажатие предиката. Вы также можете проверить это , запустив table_df.explain()
и поискав PushedFilters
часть фильтров, отмеченных звездочкой *
.
Если вам нужно найти несколько идентификаторов, вы можете использовать .isin
фильтр, но это действительно не рекомендуется для Cassandra. Лучше создать фрейм данных с идентификаторами и выполнить так называемое прямое соединение с фреймом данных Cassandra (он доступен начиная с SCC 2.5 для фреймов данных или ранее для RDDs). У меня есть длинный пост в блоге о соединении с данными в Кассандре
Комментарии:
1. Привет, большое спасибо за быстрый ответ. У меня просто есть следующий вопрос. Не могли бы вы просто дать мне знать, в чем разница между ними . фильтр() и .где() ?? Из того, что я видел, они вроде как делают то же самое.
2. никаких различий между ними, одни и те же функции — по историческим причинам