более эффективный способ вычисления фрейма данных spark

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

У меня есть фрейм данных продаж, похожий на:

 id   | date              | amount  
-----|-------------------|-------
1    |2016-03-04 12:03:00|10.40
1    |2016-03-04 12:05:10|5.0
1    |2016-03-04 12:15:50|11.30
1    |2016-03-04 12:16:00|9.40
1    |2016-03-04 12:30:00|10.0
1    |2016-03-04 12:40:00|5.40
 

И я пытаюсь сгруппировать по времени с временным интервалом в 10 минут, просуммировать сумму и создать фрейм данных, подобный:

 date             | amount  
-----------------|-------
2016-03-04 12:00 |0.0
2016-03-04 12:10 |15.40
2016-03-04 12:20 |20.70
2016-03-04 12:30 |10.0
2016-03-04 12:40 |5.40
 

Я пытался зациклить переменную datetime, фильтровать фрейм данных, группировать и суммировать, чем добавлять в список и создавать фрейм данных со списком.

 bar_list = []
while date_loop < final_date:
    start_time = date_loop - datetime.timedelta(minutes=10)
    end_time = date_loop - datetime.timedelta(seconds=1)
    df_range = (df_sale
               .filter((df_sale.date >= start_time) amp; (df_sale.date <= end_time))
               .groupby()
               .sum('amount'))
  bar_list.append((date_loop,df_range.head()['sum(amount)']))
  date_loop  = datetime.timedelta(minutes=10)

fields = ['date','amount']
df = sqlContext.createDataFrame(bar_list,fields).na.fill(0)
 

В файле с 214626 строками этот код может занять до 20 минут, чтобы рассчитать продажи за 2 месяца за 10 минут.

Есть ли более эффективный способ сделать это?, Я понимаю, что я могу поделиться переменной между рабочими, может поделиться списком? Добавляется ли к списку мое бутылочное горлышко?

Спасибо.

Ответ №1:

Это может быть немного грязно, если вы хотите обработать как строку, вы можете попробовать это:

 def getDTClosestMin(s:String):String = {
 s.substring(0,4) "-" s.substring(5,7) "-" s.substring(8,10) " "  
 s.substring(11,13) ":"  
 ((((s.substring(14,16)).toInt)*0.1).ceil)*10).round.toString.padTo(2,"0").mkString }


timeAmtRDD.map(x=> x._1 "," x._2 "," x._3)
  .map(x=>x.split(","))
  .map(x=> (getDTClosestMin(x(1)), x(2).toFloat))
  .reduceByKey(_ _)
  .sortByKey().toDF("date", "amount").show()


Output:
 ---------------- ------ 
|            date|amount|
 ---------------- ------ 
|2016-03-04 12:10|  15.4|
|2016-03-04 12:20|  20.7|
|2016-03-04 12:30|  10.0|
|2016-03-04 12:40|   5.4|
 ---------------- ------ 
 

Обновите, сколько времени это заняло .. 😉