Извлечение текста между двумя строками, если между этими двумя строками также присутствует третья строка — Pyspark

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных spark с двумя столбцами (time_stamp и message), как показано ниже:

Пример spark dataframe

       message                                             time_stamp
irrelevant_text Startstring [ID: 1AB]                   2015-01-23 08:23:16
some irrelevant text                                    2015-01-23 08:24:20
irrelevant_text mandatorystring ID [1AB]                2015-01-23 08:25:32
some irrelevant text                                    2015-01-23 08:27:18
contributor XYZ_ABCD                                    2015-01-23 08:27:54
some irrelevant text                                    2015-01-23 08:28:36
irrelevant_text endstring [ID: 1AB]                     2015-01-23 08:30:47
some irrelevant text                                    2015-01-23 08:24:20
irrelevant_text Startstring [ID: 2BC]                   2015-01-23 10:05:16
some irrelevant text                                    2015-01-23 10:24:20
contributor LMN_EFG_X                                   2015-01-23 10:27:21
some irrelevant text                                    2015-01-23 10:28:34
irrelevant_text endstring [ID: 2BC]                     2015-01-23 10:30:47
some irrelevant text                                    2015-01-23 10:50:20
irrelevant_text Startstring [ID: 3DE]                   2015-01-23 12:21:16
some irrelevant text                                    2015-01-23 12:24:20
irrelevant_text mandatorystring ID [3DE]                2015-01-23 12:37:32
some irrelevant text                                    2015-01-23 12:45:18
contributor PQRS_STU_wtx                                2015-01-23 12:47:05
some irrelevant text                                    2015-01-23 12:48:33
irrelevant_text endstring [ID: 3DE]                     2015-01-23 12:59:47

 

Я хочу извлечь вкладчика, появившегося между Startstring и endstring, если между Startstring и endstring существует mandatorystring, и отбросить вкладчиков, если mandatorystring не существует между Startstring и endstring . За одну дату может быть несколько таких экземпляров.

Ожидаемый результат:

 time_stamp                 contributor
2015-01-23 08:27:54         XYZ_ABCD                                    
2015-01-23 12:47:05       PQRS_STU_wtx                                

 

Для чтения текстового файла я использовал следующую команду.

 df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("delimiter", "t").load('{}'.format(fileName))
 

Комментарии:

1. Являются ли эти строки отдельными строками фрейма данных?

2. да, это отдельные строки фрейма данных spark с заголовками столбцов как «message» и «time_stamp»

3. Есть ли другой способ, которым вы могли бы читать / группировать данные заранее? Как вы контролируете, чтобы начальная и конечная строки попадали в один и тот же исполнитель?

4. Эта программа будет написана в Azure databricks, и я думаю, что Azure databricks обрабатывает это в конце. Итак, нам не нужно об этом думать. Этот фрейм данных создается из текстового файла с разделителями табуляции. Для чтения текстового файла я использовал следующую команду. «df = spark.read.format(«com.databricks.spark.csv»).option(«inferSchema», «false»).schema(схема).option(«разделитель», «t»).load(‘{}’.format(имя файла))»

5. Я фактически преобразовал весь фрейм данных в одну строку и применил регулярное выражение для извлечения. Он работает нормально, но иногда сбор данных занимает много времени, если размер набора данных очень большой. У меня есть набор данных объемом около 1 ТБ. Шаблон, который я использовал: «Startstring(?:(?!Startstring).)*?mandatorystring(?:(?!Startstring).)*?,[‘s]*IDs*:s*([^’,]*).*?endstring»

Ответ №1:

Отфильтруйте группы допустимых сообщений (те, которые содержат «обязательные») и получите сообщения, содержащие «участник», из допустимых групп сообщений.

 from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'begin',
    F.last(
        F.when(F.col('message').rlike('Startstring'), F.col('time_stamp')), True
    ).over(Window.orderBy('time_stamp'))
).withColumn(
    'end',
    F.first(
        F.when(F.col('message').rlike('Endstring'), F.col('time_stamp')), True
    ).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
    'mandatory',
    F.sum(
        F.col('message').rlike('mandatory').cast('int')
    ).over(Window.partitionBy('begin', 'end'))
).filter(
    "mandatory >= 1 and message rlike 'contributor'"
).select(
    'time_stamp',
    F.regexp_extract('message', 'contributor (S )', 1).alias('contributor')
)

df2.show()
 ------------------- ------------ 
|         time_stamp| contributor|
 ------------------- ------------ 
|2015-01-23 08:27:54|    XYZ_ABCD|
|2015-01-23 12:47:05|PQRS_STU_wtx|
 ------------------- ------------ 
 

Комментарии:

1. ПРИМЕЧАНИЕ: Это хорошее решение для небольшого объема данных, но это решение не может быть запущено на databricks, если вы используете его для больших данных. Он использует функцию Windows, которая ограничивает использование рабочего узла до 1, даже если у вас много рабочих узлов. Мне пришлось разработать собственное решение с использованием SQL для использования параллелизма в databricks.

Ответ №2:

Используйте window функции.

Попробуйте приведенный ниже код.

Импортируйте необходимые библиотеки.

 from pyspark.sql import functions as F
from pyspark.sql.window import Window
 

Загрузка данных во фрейм данных.

 df = spark.read.format("csv").option("header","true").load("/tmp/data/sample.csv")
 
 df 
.withColumn("subMessage", 
    F.when(F.col("message").contains("Startstring"),F.lit("start")) 
    .when(F.col("message").contains("mandatorystring"),F.lit("mandatory")) 
    .when(F.col("message").contains("contributor"),F.regexp_replace(F.col("message"),"contributor ","")) 
    .when(F.col("message").contains("endstring"),F.lit("end"))
) 
.filter(F.col("subMessage").isNotNull()) 
.withColumn("iscontributor",((F.lead(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "end") amp; (F.lag(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "mandatory"))) 
.filter(F.col("iscontributor") == True) 
.show()
 

Окончательный вывод.

  -------------------- ------------------- ------------ ------------- 
|             message|         time_stamp|  subMessage|iscontributor|
 -------------------- ------------------- ------------ ------------- 
|contributor XYZ_ABCD|2015-01-23 08:27:54|    XYZ_ABCD|         true|
|contributor PQRS_...|2015-01-23 12:47:05|PQRS_STU_wtx|         true|
 -------------------- ------------------- ------------ -------------