Databricks spark лучшая практика для записи структурированного потока на множество приемников?

#apache-spark #pyspark #apache-kafka #databricks #spark-structured-streaming

#apache-spark #pyspark #apache-kafka #databricks #spark-structured-streaming

Вопрос:

Я использую databricks spark 3.x, и я читаю очень большое количество потоков (более 100), и у каждого потока есть свой собственный контракт, и его нужно записать в свою собственную таблицу delta / parquet / sql / whatever. Хотя потоков много, активность на поток невелика — некоторые потоки могут видеть только сотни записей в день. Я действительно хочу потоковую передачу, потому что я стремлюсь к подходу с довольно низкой задержкой.

Вот о чем я говорю (код сокращен для простоты; я правильно использую контрольные точки, режимы вывода и т. Д.). Предположим schemas , что переменная содержит схему для каждой темы. Я пробовал этот подход, когда я создаю тонну отдельных потоков, но это требует много вычислений, и большая их часть тратится впустую:

 def batchprocessor(topic, schema):
  def F(df, batchId):
    sql = f'''
  MERGE INTO SOME TABLE
  USING SOME MERGE TABLE ON SOME CONDITION
  WHEN MATCHED
  UPDATE SET *
  WHEN NOT MATCHED
  INSERT *
  '''
    df.createOrReplaceTempView(f"SOME MERGE TABLE")
    df._jdf.sparkSession().sql(sql)
  return F
for topic in topics:
  query = (spark
    .readStream
    .format("delta")
    .load(f"/my-stream-one-table-per-topic/{topic}")
    .withColumn('json', from_json(col('value'),schemas[topic]))
    .select(col('json.*'))
    .writeStream
    .format("delta")
    .foreachBatch(batchProcessor(topic, schema))
    .start())
  

Я также пытался создать только один поток, который выполнял тонну фильтрации, но производительность была довольно низкой даже в тестовой среде, где я отправлял одно сообщение в одну тему:

 def batchprocessor(df, batchId):
  df.cache()
  for topic in topics:
    filteredDf = (df.filter(f"topic == '{topic}'")
      .withColumn('json', from_json(col('value'),schemas[topic]))
      .select(col('json.*')))
    sql = f'''
  MERGE INTO SOME TABLE
  USING SOME MERGE TABLE ON SOME CONDITION
  WHEN MATCHED
  UPDATE SET *
  WHEN NOT MATCHED
  INSERT *
  '''
    filteredDf.createOrReplaceTempView(f"SOME MERGE TABLE")
    filteredDf._jdf.sparkSession().sql(sql)
  df.unpersist()

query = (spark
.readStream
.format("delta")
.load(f"/my-stream-all-topics-in-one-but-partitioned")
.writeStream
.format("delta")
.foreachBatch(batchProcessor)
.start())
  

Есть ли какой-нибудь хороший способ существенно демультиплексировать такой поток? Он уже разделен, поэтому я предполагаю, что планировщик запросов не выполняет слишком много избыточной работы, но, похоже, тем не менее, существует огромное количество накладных расходов.

Комментарии:

1. какова точка соприкосновения с Kafka? Я вижу, вы читаете и записываете в delta lake…

2. Я выполняю двухэтапный процесс, в котором я выполняю поток из kafka (119 тем), записываю в delta lake, затем выполняю поток из delta lake (бронзовая / серебряная / золотая архитектура). Я пробовал записывать все темы в одну разделенную таблицу и записывать каждую тему в отдельную таблицу, потому что они по-разному влияют на эту фазу канала.

Ответ №1:

Я провел несколько тестов, и вариант 2 более эффективен. Я пока не совсем понимаю, почему.

В конечном счете, производительность все еще не была той, чего я хотел — каждая тема выполняется по порядку, независимо от размера, поэтому одна запись в каждой теме приведет к тому, что планировщик FIFO поставит в очередь множество очень неэффективных мелких операций. Я решил это с помощью распараллеливания:

 import threading
def writeTable(table, df, poolId, sc):
  sc.setLocalProperty("spark.scheduler.pool", poolId)
  df.write.mode('append').format('delta').saveAsTable(table)
  sc.setLocalProperty("spark.scheduler.pool", None)
def processBatch(df, batchId):
  df.cache()
  dfsToWrite = {}
  for row in df.select('table').distinct().collect():
    table = row.table
    filteredDf = df.filter(f"table = '{table}'")
    dfsToWrite[table] = filteredDf
  threads = []
  for table,df in dfsToWrite.items():
    threads.append(threading.Thread(target=writeTable,args=(table, df,table,spark.sparkContext)))
  for t in threads:
    t.start()
  for t in threads:
    t.join()
  df.unpersist()
  

Комментарии:

1. Какое окончательное решение вы выбрали? Я считаю, что второе решение может быть быстрее, потому что вы не читаете данные из Kafka несколько раз. У меня идентичная проблема, и я действительно разочарован производительностью. Кажется, что наличие большого количества медленных потоков — это большие накладные расходы для Spark.

2. Я думаю, что причиной разницы было количество накладных расходов, которые драйвер выполняет для каждого потокового запроса — наличие только одного потокового запроса сводит это к минимуму. Однако я не был полностью удовлетворен своим вторым подходом, потому что каждая тема полностью занимает планировщик FIFO до ее завершения, поэтому кошмарный сценарий, в котором каждая тема содержит ровно одно сообщение, приводит к линейной серии очень неэффективных записей. В конечном итоге я получил необходимую производительность, используя потоки python и пулы spark, чтобы получить истинное распараллеливание в foreachbatch. Я обновил свой ответ примером кода

3. Спасибо за ответ, когда я писал свой первый комментарий, я чувствовал: xkcd.com/979 Во всяком случае, я заметил, что в вашем решении вы больше не используете «ОБЪЕДИНИТЬ В ТАБЛИЦУ …» — это также отрицательно сказалось на производительности для вас? Недостатком является то, что вы можете получить дубликаты. Сколько разных пулов вы используете? Это 1 на тему? Я попытался настроить spark, манипулируя значением SPARK_WORKER_CORES, чтобы получить больше параллелизма на ядро, и использовал потоковую обработку в Scala, но я не впечатлен задержкой и тем, сколько процессора это потребляет.

4. Это решение довольно сильно ударяет по процессору, но в моем случае (более 130 тем, только 4-5 см. high use) задержка снизилась с ~ 7 минут до ~ 20 секунд. Я использую один пул для каждой темы. Извините, я отказался от merge into table по несвязанным причинам — я просто искал каждую оптимизацию, которую мог найти, и определил, что ветвь «if matched» никогда не попадала.