#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть фрейм данных spark с двумя столбцами (time_stamp и message), как показано ниже:
Пример spark dataframe
message time_stamp
irrelevant_text Startstring [ID: 1AB] 2015-01-23 08:23:16
some irrelevant text 2015-01-23 08:24:20
irrelevant_text mandatorystring ID [1AB] 2015-01-23 08:25:32
some irrelevant text 2015-01-23 08:27:18
contributor XYZ_ABCD 2015-01-23 08:27:54
some irrelevant text 2015-01-23 08:28:36
irrelevant_text endstring [ID: 1AB] 2015-01-23 08:30:47
some irrelevant text 2015-01-23 08:24:20
irrelevant_text Startstring [ID: 2BC] 2015-01-23 10:05:16
some irrelevant text 2015-01-23 10:24:20
contributor LMN_EFG_X 2015-01-23 10:27:21
some irrelevant text 2015-01-23 10:28:34
irrelevant_text endstring [ID: 2BC] 2015-01-23 10:30:47
some irrelevant text 2015-01-23 10:50:20
irrelevant_text Startstring [ID: 3DE] 2015-01-23 12:21:16
some irrelevant text 2015-01-23 12:24:20
irrelevant_text mandatorystring ID [3DE] 2015-01-23 12:37:32
some irrelevant text 2015-01-23 12:45:18
contributor PQRS_STU_wtx 2015-01-23 12:47:05
some irrelevant text 2015-01-23 12:48:33
irrelevant_text endstring [ID: 3DE] 2015-01-23 12:59:47
Я хочу извлечь вкладчика, появившегося между Startstring и endstring, если между Startstring и endstring существует mandatorystring, и отбросить вкладчиков, если mandatorystring не существует между Startstring и endstring . За одну дату может быть несколько таких экземпляров.
Ожидаемый результат:
time_stamp contributor
2015-01-23 08:27:54 XYZ_ABCD
2015-01-23 12:47:05 PQRS_STU_wtx
Для чтения текстового файла я использовал следующую команду.
df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("delimiter", "t").load('{}'.format(fileName))
Комментарии:
1. Являются ли эти строки отдельными строками фрейма данных?
2. да, это отдельные строки фрейма данных spark с заголовками столбцов как «message» и «time_stamp»
3. Есть ли другой способ, которым вы могли бы читать / группировать данные заранее? Как вы контролируете, чтобы начальная и конечная строки попадали в один и тот же исполнитель?
4. Эта программа будет написана в Azure databricks, и я думаю, что Azure databricks обрабатывает это в конце. Итак, нам не нужно об этом думать. Этот фрейм данных создается из текстового файла с разделителями табуляции. Для чтения текстового файла я использовал следующую команду. «df = spark.read.format(«com.databricks.spark.csv»).option(«inferSchema», «false»).schema(схема).option(«разделитель», «t»).load(‘{}’.format(имя файла))»
5. Я фактически преобразовал весь фрейм данных в одну строку и применил регулярное выражение для извлечения. Он работает нормально, но иногда сбор данных занимает много времени, если размер набора данных очень большой. У меня есть набор данных объемом около 1 ТБ. Шаблон, который я использовал: «Startstring(?:(?!Startstring).)*?mandatorystring(?:(?!Startstring).)*?,[‘s]*IDs*:s*([^’,]*).*?endstring»
Ответ №1:
Отфильтруйте группы допустимых сообщений (те, которые содержат «обязательные») и получите сообщения, содержащие «участник», из допустимых групп сообщений.
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'begin',
F.last(
F.when(F.col('message').rlike('Startstring'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp'))
).withColumn(
'end',
F.first(
F.when(F.col('message').rlike('Endstring'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
'mandatory',
F.sum(
F.col('message').rlike('mandatory').cast('int')
).over(Window.partitionBy('begin', 'end'))
).filter(
"mandatory >= 1 and message rlike 'contributor'"
).select(
'time_stamp',
F.regexp_extract('message', 'contributor (S )', 1).alias('contributor')
)
df2.show()
------------------- ------------
| time_stamp| contributor|
------------------- ------------
|2015-01-23 08:27:54| XYZ_ABCD|
|2015-01-23 12:47:05|PQRS_STU_wtx|
------------------- ------------
Комментарии:
1. ПРИМЕЧАНИЕ: Это хорошее решение для небольшого объема данных, но это решение не может быть запущено на databricks, если вы используете его для больших данных. Он использует функцию Windows, которая ограничивает использование рабочего узла до 1, даже если у вас много рабочих узлов. Мне пришлось разработать собственное решение с использованием SQL для использования параллелизма в databricks.
Ответ №2:
Используйте window
функции.
Попробуйте приведенный ниже код.
Импортируйте необходимые библиотеки.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
Загрузка данных во фрейм данных.
df = spark.read.format("csv").option("header","true").load("/tmp/data/sample.csv")
df
.withColumn("subMessage",
F.when(F.col("message").contains("Startstring"),F.lit("start"))
.when(F.col("message").contains("mandatorystring"),F.lit("mandatory"))
.when(F.col("message").contains("contributor"),F.regexp_replace(F.col("message"),"contributor ",""))
.when(F.col("message").contains("endstring"),F.lit("end"))
)
.filter(F.col("subMessage").isNotNull())
.withColumn("iscontributor",((F.lead(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "end") amp; (F.lag(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "mandatory")))
.filter(F.col("iscontributor") == True)
.show()
Окончательный вывод.
-------------------- ------------------- ------------ -------------
| message| time_stamp| subMessage|iscontributor|
-------------------- ------------------- ------------ -------------
|contributor XYZ_ABCD|2015-01-23 08:27:54| XYZ_ABCD| true|
|contributor PQRS_...|2015-01-23 12:47:05|PQRS_STU_wtx| true|
-------------------- ------------------- ------------ -------------