Предикат в Pyspark JDBC не выполняет секционированное чтение

#apache-spark #jdbc #pyspark #apache-spark-sql #partition

#apache-spark #jdbc #pyspark #apache-spark-sql #раздел

Вопрос:

Я пытаюсь прочитать таблицу Mysql в PySpark, используя чтение JDBC. Сложность здесь в том, что таблица значительно большая и, следовательно, приводит к сбою нашего Spark executor, когда он выполняет несекционированное ванильное чтение таблицы.

Следовательно, целевая функция в основном заключается в том, что мы хотим выполнить секционированное чтение таблицы. Пара вещей, которые мы пытались —

  1. Мы рассмотрели комбинацию «numPartitions-partitionColumn-lowerBound-upperBound». Для нас это не работает, поскольку наш ключ индексации исходной таблицы представляет собой строку, и это работает только с целыми типами.
  2. Другой альтернативой, предложенной в документации, является опция predicate . Похоже, это не работает для нас, в том смысле, что количество разделов, похоже, по-прежнему равно 1, а не количеству предикатов, которые мы отправляем.

Фрагмент кода, который мы используем, выглядит следующим образом —

 input_df = self._Flow__spark.read 
            .format("jdbc") 
            .option("url", url) 
            .option("user", config.user) 
            .option("password", config.password) 
            .option("driver", "com.mysql.cj.jdbc.Driver") 
            .option("dbtable", "({}) as query ".format(get_route_surge_details_query(start_date, end_date))) 
            .option("predicates", ["recommendation_date = '2020-11-14'",
                                   "recommendation_date = '2020-11-15'",
                                   "recommendation_date = '2020-11-16'",
                                   "recommendation_date = '2020-11-17'",
                                   ]) 
            .load()
 

Похоже, что он выполняет полное сканирование таблицы (без разделения), полностью игнорируя переданные предикаты. Было бы здорово получить некоторую помощь по этому вопросу.

Комментарии:

1. Я думаю, что предикаты не могут быть вариантом. Вместо этого вы должны использовать функцию jdbc. github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org /…

Ответ №1:

Попробуйте выполнить следующее :

 spark_session
  .read
  .jdbc(url=url,
        table= "({}) as query ".format(get_route_surge_details_query(start_date, end_date)),
        predicates=["recommendation_date = '2020-11-14'",
                    "recommendation_date = '2020-11-15'",
                    "recommendation_date = '2020-11-16'",
                    "recommendation_date = '2020-11-17'"],
        properties={
          "user": config.user,
          "password": config.password,
          "driver": "com.mysql.cj.jdbc.Driver"
        }
)
 

Проверьте разделы с помощью

 df.rdd.getNumPartitions() # Should be 4
 

Я нашел это после изучения документов по адресу https://spark.apache.org/docs/latest/api/python/pyspark.sql.html ?выделите=jdbc#pyspark.sql.DataFrameReader.jdbc