#apache-spark #jdbc #pyspark #apache-spark-sql #partition
#apache-spark #jdbc #pyspark #apache-spark-sql #раздел
Вопрос:
Я пытаюсь прочитать таблицу Mysql в PySpark, используя чтение JDBC. Сложность здесь в том, что таблица значительно большая и, следовательно, приводит к сбою нашего Spark executor, когда он выполняет несекционированное ванильное чтение таблицы.
Следовательно, целевая функция в основном заключается в том, что мы хотим выполнить секционированное чтение таблицы. Пара вещей, которые мы пытались —
- Мы рассмотрели комбинацию «numPartitions-partitionColumn-lowerBound-upperBound». Для нас это не работает, поскольку наш ключ индексации исходной таблицы представляет собой строку, и это работает только с целыми типами.
- Другой альтернативой, предложенной в документации, является опция predicate . Похоже, это не работает для нас, в том смысле, что количество разделов, похоже, по-прежнему равно 1, а не количеству предикатов, которые мы отправляем.
Фрагмент кода, который мы используем, выглядит следующим образом —
input_df = self._Flow__spark.read
.format("jdbc")
.option("url", url)
.option("user", config.user)
.option("password", config.password)
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", "({}) as query ".format(get_route_surge_details_query(start_date, end_date)))
.option("predicates", ["recommendation_date = '2020-11-14'",
"recommendation_date = '2020-11-15'",
"recommendation_date = '2020-11-16'",
"recommendation_date = '2020-11-17'",
])
.load()
Похоже, что он выполняет полное сканирование таблицы (без разделения), полностью игнорируя переданные предикаты. Было бы здорово получить некоторую помощь по этому вопросу.
Комментарии:
1. Я думаю, что предикаты не могут быть вариантом. Вместо этого вы должны использовать функцию jdbc. github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org /…
Ответ №1:
Попробуйте выполнить следующее :
spark_session
.read
.jdbc(url=url,
table= "({}) as query ".format(get_route_surge_details_query(start_date, end_date)),
predicates=["recommendation_date = '2020-11-14'",
"recommendation_date = '2020-11-15'",
"recommendation_date = '2020-11-16'",
"recommendation_date = '2020-11-17'"],
properties={
"user": config.user,
"password": config.password,
"driver": "com.mysql.cj.jdbc.Driver"
}
)
Проверьте разделы с помощью
df.rdd.getNumPartitions() # Should be 4
Я нашел это после изучения документов по адресу https://spark.apache.org/docs/latest/api/python/pyspark.sql.html ?выделите=jdbc#pyspark.sql.DataFrameReader.jdbc