#apache-spark #apache-spark-sql #hbase #phoenix
#apache-spark #apache-spark-sql #hbase #apache-phoenix
Вопрос:
У меня есть этот вариант использования для автоматизированного задания SparkSQL, где я хочу сделать это :
-
Прочитайте таблицу (назовем ее table1) из Phoenix с помощью Spark и соберите во фрейме данных (назовем его df1) все найденные отрицательные значения
-
Затем я хочу удалить записи из другой таблицы (table2), где значения из столбца находятся в df1 (думал о выполнении запроса ОБЪЕДИНЕНИЯ, но я хотел знать, возможно ли это с фреймом данных, и есть ли API, использующий фреймы данных HBase и Spark)
-
AFAIK Phoenix напрямую не поддерживает операции УДАЛЕНИЯ через Spark (пожалуйста, поправьте меня, если я ошибаюсь, и если есть способ, я бы с удовольствием послушал об этом), поэтому я больше склоняюсь к использованию HBase Spark API
Вот схема для более наглядного объяснения :
Вот некоторый код.
Соберите отрицательные значения во фрейме данных :
// Collect negative values
val negativeValues = spark
.sqlContext
.phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
.select('COLUMN1)
.where('COLUMN2.lt(0))
// Send the query
[...]
Удалите значения из таблицы2, где СТОЛБЕЦ 1 имеет отрицательные значения, поэтому что-то вроде этого в SQL (и если возможно применить IN к DF напрямую) :
DELETE FROM table2 WHERE COLUMN1 IN negativeValues
Мой ожидаемый результат был бы таким :
table1
column1 | column2
|
123456 | 123
234567 | 456
345678 | -789
456789 | 012
567891 | -123
table2
column1 | column2
|
123456 | 321
234567 | 654
345678 | 945 <---- same column1 as table1's, so delete
456789 | 987
567891 | 675 <---- same column1 as table1's, so delete
Итак, в конечном счете, я хотел бы знать, есть ли способ отправить этот запрос на удаление в HBase через Spark без лишней суеты.
Спасибо.
Ответ №1:
вам необходимо создать пользовательский API, если необходимо запустить запрос «УДАЛИТЬ» из spark через Phoenix (sql engine) в Hbase.
Можно использовать следующий подход,
- получите столбец rowkey table2 из исходного фрейма данных, чтобы выполнить удаление (в table2).
- создайте код для работы с каждым разделом исходного фрейма данных и создайте запрос «УДАЛИТЬ». допустим, запрос «УДАЛИТЬ Из table2, ГДЕ column1 = ? «, подготовьте его и выполните в виде пакета с нужным размером пакета, который вы видите. поскольку мы выполняем его параллельно для каждого раздела фрейма данных, номер раздела в исходном фрейме данных определяет параллелизм. таким образом, вы можете попробовать повторно разбить его на разделы нужного размера, чтобы увидеть правильные показатели производительности.
если есть возможность пропустить механизм sql, вы также можете использовать spark-hbase direct API. вот один из таких примеров — https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala