Spark-Кассандра , Как получить данные на основе запроса

#apache-spark #pyspark #cassandra #spark-cassandra-connector

Вопрос:

У меня есть таблица Кассандры, которая довольно огромна, и прямо сейчас у меня есть соединение spark-Cassandra со следующим кодом.

 import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext


os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages  com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

table_df = sqlContext.read
        .format("org.apache.spark.sql.cassandra")
        .options(table='movies', keyspace='movie_lens')
        .load()
        
 

Первичным ключом является Movie_id, который является целым числом.
функция .load() загружает всю таблицу в память, этого я хочу избежать. Один из способов, который у меня есть, — это использовать фильтр

 table_df = sqlContext.read
        .format("org.apache.spark.sql.cassandra")
        .options(table='movies', keyspace='movie_lens')
        .load()
        .filter("movie_id = 37032")
 

Но действительно ли фильтр предотвращает загрузку всей таблицы в память? или он сначала загружается, а затем фильтруется.
Кроме того, мне нужно запросить много идентификаторов. допустим, мне нужно 1000 идентификаторов, и каждый день идентификаторы продолжают меняться . Тогда как это сделать?

Ответ №1:

Да, Spark Cassandra Connector выполнит так называемый «вывод предикатов», если вы выполняете запрос по ключу раздела, и загрузит данные только из определенного запроса ( .load функция просто загрузит метаданные, фактическая загрузка данных произойдет в первый раз, когда вам действительно нужны данные для выполнения действия). Существуют хорошо документированные правила о том, когда в соединителе Spark Cassandra происходит нажатие предиката. Вы также можете проверить это , запустив table_df.explain() и поискав PushedFilters часть фильтров, отмеченных звездочкой * .

Если вам нужно найти несколько идентификаторов, вы можете использовать .isin фильтр, но это действительно не рекомендуется для Cassandra. Лучше создать фрейм данных с идентификаторами и выполнить так называемое прямое соединение с фреймом данных Cassandra (он доступен начиная с SCC 2.5 для фреймов данных или ранее для RDDs). У меня есть длинный пост в блоге о соединении с данными в Кассандре

Комментарии:

1. Привет, большое спасибо за быстрый ответ. У меня просто есть следующий вопрос. Не могли бы вы просто дать мне знать, в чем разница между ними . фильтр() и .где() ?? Из того, что я видел, они вроде как делают то же самое.

2. никаких различий между ними, одни и те же функции — по историческим причинам