#python #apache-spark #pyspark #apache-spark-sql #spark-streaming
#python #apache-spark #pyspark #apache-spark-sql #потоковая передача spark
Вопрос:
У меня есть потоковая передача этих данных из моего hdfs:
nyu,-0.0,1.36,0.64,1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.494,1.506,0.0,-1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
cynthia,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,-0.0,1.322,0.6779999999999999,1.8496000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
haha,-0.0,0.921,1.079,1.5928,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.16499999999999998,1.419,0.417,1.6442999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.076,1.608,0.317,1.334,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.142,1.363,0.497,1.8187,15131c3f-7ad9-489d-bbc4-0f99305b7db0
american,-0.028,1.888,0.084,0.8658,15131c3f-7ad9-489d-bbc4-0f99305b7db0
middleburi,-0.148,1.6880000000000002,0.164,0.5698000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
metro,-0.096,1.654,0.249,1.3209,15131c3f-7ad9-489d-bbc4-0f99305b7db0
simon,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
korea,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anthoni,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anderson,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.7349999999999999,0.217,1.6118999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.20700000000000002,1.6949999999999998,0.097,0.3662000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
america,-0.047,1.338,0.614,1.679,15131c3f-7ad9-489d-bbc4-0f99305b7db0
Я хочу суммировать оценки с одинаковым словом (первый столбец) и номером документа (последний столбец).
До сих пор у меня есть следующий код:
from pyspark.streaming import StreamingContext
import time
import pprint
from pyspark.sql import functions as F
ssc = StreamingContext(sc, 60)
lines = ssc.textFileStream("hdfs://thesis:9000/user/kush/data/")
data = lines.map(lambda x: x.split(','))
// trying to do the task here
m_Data = data.reduceByKey(lambda x,y: (x[1] y[1], x[2] y[2],x[3] y[3],x[4] y[4]))
m_Data.pprint()
ssc.start()
time.sleep(5)
Как это возможно в потоковой передаче pyspark?
Комментарии:
1. Вы можете просто сгруппировать по столбцам и агрегировать по количеству баллов. Если это не то, что вы имели в виду, я бы посоветовал вам добавить ожидаемый результат для образца данных.
2. Я не думаю, что смогу groupby для live streaming..do я делаю это в foreachRDD?
3. Потоковая передача — это, по сути, микропакетирование (насколько я знаю). Пожалуйста, добавьте свой код к вопросу, и давайте попробуем развить это.
4. я обновил вопрос. спасибо, что посмотрели заранее.
Ответ №1:
Чтобы использовать сокращение по ключу, вам нужно действительно помочь spark определить ключ. Я создал rdd ключ / значение под названием pair.
Ключ определяется словом и номером документа. Значение — это структура данных, соответствующая оценке. Оценка также была преобразована в float (или что еще вы хотите в соответствии с вашим набором данных) для вычислений.
data = lines.map(lambda x: x.split(','))
pair = data.map(lambda x: ( (x[0],x[5]), (float(x[1]),float(x[2]),float(x[3]),float(x[4])) ))
aggregation = pair.reduceByKey(lambda x,y: ( x[0] y[0], x[1] y[1], x[2] y[2], x[3] y[3] ))
aggregation.pprint(20)
Пример вывода:
(('haha', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.0, 0.921, 1.079, 1.5928))
(('american', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.028, 1.888, 0.084, 0.8658))
(('madrid', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.617, 4.968999999999999, 0.41400000000000003, 0.8935000000000002))
(('middleburi', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.148, 1.6880000000000002, 0.164, 0.5698000000000001))
(('cynthia', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.085, 1.4300000000000002, 0.485, 1.6916))
(('metro', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.096, 1.654, 0.249, 1.3209))
(('korea', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.797, 0.155, 1.2171))
(('anthoni', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.855, 0.097, 0.9211))
(('anderson', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.855, 0.097, 0.9211))
(('spain', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.6549999999999999, 9.806, 1.538, 7.8753))
(('nyu', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.0, 1.36, 0.64, 1.3616))
(('rachel', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.085, 2.7520000000000002, 1.1629999999999998, 3.5412))
(('simon', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.797, 0.155, 1.2171))
(('america', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.338, 0.614, 1.679))
Отдельно отметим, что сценарии, в которых ключ k (word / document) приходит одновременно t, а затем вторая запись через 30 минут, не будут охвачены этим.