#apache-spark #jdbc #apache-flink #spark-streaming #flink-streaming
Вопрос:
Я пытаюсь настроить конвейер для обработки целых таблиц SQL одну за другой с первоначальным приемом через JDBC. Мне нужно иметь возможность использовать возможности обработки более высокого уровня, такие как те, которые доступны в Apache Spark или Flink, и я хотел бы использовать любые существующие возможности, а не писать свои собственные, хотя это может быть неизбежно. Мне нужно, чтобы я мог выполнить этот конвейер при ограниченной настройке (потенциально на одном ноутбуке). Пожалуйста, обратите внимание, что я не говорю здесь о захвате или поглощении CDC, я просто хочу выполнить пакетную обработку существующей таблицы таким образом, чтобы не использовать ни одну машину.
В качестве тривиального примера у меня есть таблица в SQL Server объемом 500 ГБ. Я хочу разбить его на более мелкие фрагменты, которые поместились бы в 16 ГБ-32 ГБ доступной памяти в недавно современном ноутбуке, применить функцию преобразования к каждой из строк, а затем переслать их в приемник.
Некоторые из доступных решений, которые кажутся близкими к тому, что мне нужно:
- Чтение с разделением Apache Spark:
spark.read.format("jdbc").
.option("driver", driver)
.option("url", url)
.option("partitionColumn", id)
.option("lowerBound", min)
.option("upperBound", max)
.option("numPartitions", 10)
.option("fetchsize",1000)
.option("dbtable", query)
.option("user", "username")
.option("password", "password")
.load()
Похоже, что я могу еще repartition
больше выровнять наборы данных после первоначального чтения.
Проблема в том, что в локальном режиме выполнения я ожидаю, что вся таблица будет разделена на несколько ядер процессора, которые все будут пытаться загрузить свой соответствующий фрагмент в память, что приведет ко всему бизнесу.
- Есть ли способ ограничить задания на чтение, чтобы выполнялось только столько заданий, сколько может поместиться в памяти? Могу ли я заставить задания выполняться последовательно?
- Могу ли я, возможно, разбить таблицу на гораздо меньшие куски, намного больше, чем ядер, в результате чего будет обработано только небольшое количество за один раз? Разве это не помешало бы всему бесконечному планированию задач и т. Д.?
- Если бы я хотел написать свой собственный источник для потоковой передачи в Spark, облегчило бы это мои проблемы с памятью? Помогает ли мне что-то подобное?
- Вступает ли здесь в игру управление памятью Spark вообще? Зачем нужно загружать весь раздел в память сразу во время чтения?
- Я рассматривал Apache Flink в качестве альтернативы, поскольку потоковая модель, возможно, здесь немного более уместна. Вот что он предлагает с точки зрения JDBC:
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/log_db")
.setUsername("username")
.setPassword("password")
.setQuery("select id, something from SOMETHING")
.setRowTypeInfo(rowTypeInfo)
.finish()
Однако, похоже, что это также предназначено для пакетной обработки и все еще пытается загрузить все в память.
- Как бы я стал спорить с Flink о передаче микропакетов данных SQL для обработки?
- Могу ли я потенциально написать свой собственный источник потоковой передачи, который обертывает формат ввода JDBC?
- Можно ли с уверенностью предположить, что с Flink не происходит никаких сбоев, если только некоторые состояния/аккумуляторы не станут слишком большими?
Я также видел, что у Кафки есть разъемы JDBC, но, похоже, на самом деле невозможно запустить его локально (т. Е. ту же JVM), как и другие потоковые платформы. Спасибо всем вам за помощь!
Ответ №1:
Это правда, что в Flink форматы ввода предназначены только для пакетной обработки, но это не должно быть проблемой. Flink выполняет пакетную обработку одного события за раз, не загружая все в память. Я думаю, то, что ты хочешь, должно просто сработать.