Как реализовать эффективное преобразование времени в длину в зависимости от скорости в Pyspark?

#pyspark #time-series #etl

#pyspark #временные ряды #etl

Вопрос:

Одна машина предоставляет данные о тысячах датчиков. Машина разматывает металлическую полосу за один раз. В следующий раз металлическая полоса нагревается, а в третий раз металлическая полоса охлаждается. С помощью метки времени, измеренной скорости и триггера (например, печь ввода / вывода) на шаге ETL должна быть сгенерирована переменная диапазона.

  ---------------- ---------- ----------- --------- ----- 
|time            |input_oven|output_oven|temp_oven|speed|
 ---------------- ---------- ----------- --------- ----- 
|2017-01-01-01-20|0         |0          |450      |3    |
|2017-01-01-01-21|0         |0          |450      |3    |
|2017-01-01-01-22|1         |0          |450      |3    |
|2017-01-01-01-23|0         |0          |450      |4    |
|2017-01-01-01-24|0         |0          |451      |4    |
|2017-01-01-01-25|0         |1          |450      |4    |
|2017-01-01-01-26|0         |0          |450      |3    |
 ---------------- ---------- ----------- --------- ----- 
 

Как вы можете видеть, скорость может варьироваться. Я попробовал следующий код, но это слишком неточно, в том числе и потому, что машина может остановиться, например.

 from scipy import integrate
s = lambda s: col_speed*col_time
integrate.quad(s, time_1, time_2)
 

Следовательно, интегрирование должно выполняться с помощью переменной скорости, чтобы можно было сгенерировать новую переменную счетчика. Один файл содержит 30 тысяч записей из 5000 датчиков.

Результатом должна быть таблица, которая сопоставляет все данные датчиков, чтобы я мог видеть: измеритель металлической полосы one измерил температуру печи и скорость охлаждения.

Любая помощь очень приветствуется, и я заранее благодарю вас.

Редактировать

Чтобы дать еще некоторое представление, я добавил следующую картинку.

Временные ряды сигналов нескольких датчиков одной производственной линии. Зеленая линия представляет текущее время. Желтая линия представляет одну и ту же позицию длины при разных отметках времени.

Целью ETL-задания должно быть выравнивание всех сигналов датчиков с учетом положения длины. Поэтому у меня возникла идея использовать следующее уравнение:

 length = speed * time
time = time_delta(output_oven-input_oven)
speed = avg(speed)
 

Что касается приведенных данных примера, уравнение должно быть решено следующим образом для полного фрейма данных

 length = avg(speed) * time_delta(output_oven-input_oven)
length = 4 m/min * 2017-01-01-01-25-2017-01-01-01-22
length = 4 m/min * 3 min = 12 m
 

Теперь я знаю, какая часть моей металлической полосы проходила через печь. Предполагая, что моя металлическая полоса имеет длину 12 метров. Теперь я хочу отставать от всех других сигналов датчиков в зависимости от длины.

Комментарии:

1. Можете ли вы привести отработанный пример? Может быть, для простого случая одного периода времени, который вы рассчитали вручную?

2. Спасибо за ваш комментарий @sramalingam24. Я добавил дополнительную информацию.

3. Вы пытаетесь рассчитать длину листа через печь 1 или все печи здесь?

4. Я хочу рассчитать длину по всем датчикам в каждой печи.

Ответ №1:

Вот моя попытка, близко ли это к тому, что вы хотите?

 from pyspark.sql import functions as f
from pyspark.sql import Row

Columns = Row('time','input_oven','output_oven','temp_oven','speed')
x=[Columns(20,0,0 ,450,3),
Columns(21,0,0 ,450,3),
Columns(22,1,0 ,450,3),
Columns(23,0,0 ,450,4),
Columns(24,0,0 ,451,4),
Columns(25,0,1 ,450,4),
Columns(26,0,0 ,450,3)]

df = spark.createDataFrame(x).withColumn('id', f.lit(1))
df.printSchema()

df1 = df.withColumn('oven', df['input_oven'] df['output_oven'])

from pyspark.sql.window import Window

w = Window.partitionBy(df['id']).orderBy(df['time'])
cum_oven = f.sum(df1['oven']).over(w)
df2 = df1.select(df1['time'],df1['speed'], df1['output_oven'],cum_oven.alias('cum_oven'))
df3 = df2.withColumn('cum_oven', df2['cum_oven']-df2['output_oven']).drop(df2['output_oven'])

ws = Window.partitionBy(df3['cum_oven']).orderBy(df3['time'])
metal_length = (f.max(df3['time']).over(ws)-f.min(df3['time']).over(ws))*df3['speed']

df4 = df3.select(df3['time'], df3['cum_oven'], metal_length.alias('metal_length'))

fdf = df.join(df4, ['time'])
fdf.drop('id').sort('time').show()

 ---- ---------- ----------- --------- ----- -------- ------------ 
|time|input_oven|output_oven|temp_oven|speed|cum_oven|metal_length|
 ---- ---------- ----------- --------- ----- -------- ------------ 
|  20|         0|          0|      450|    3|       0|           0|
|  21|         0|          0|      450|    3|       0|           3|
|  22|         1|          0|      450|    3|       1|           0|
|  23|         0|          0|      450|    4|       1|           4|
|  24|         0|          0|      451|    4|       1|           8|
|  25|         0|          1|      450|    4|       1|          12|
|  26|         0|          0|      450|    3|       2|           0|
 ---- ---------- ----------- --------- ----- -------- ------------ 
 

Конечный интеграл — это просто groupBy, max и сумма?

Комментарии:

1. Спасибо @sramalingam24! Работает нормально!

2. Нет проблем, возможно, вам потребуется пронумеровать печи, начиная с единицы, а не с нуля, чтобы это работало с полным набором данных