Как подсчитать один и тот же элемент с несколькими параметрами в mrjob в python?

#python #mapreduce #bigdata #mrjob #word-frequency

#питон #mapreduce #bigdata #мистер джоб #частота слов

Вопрос:

Я пытаюсь написать функцию сокращения карты на python. У меня есть файл, содержащий информацию о продукте, и я хочу подсчитать количество продуктов, относящихся к одной категории и имеющих одну и ту же версию. подобный этому: lt;category, {count, version} gt;

Информация о моем файле выглядит следующим образом:

 product_name rate category id version  a "3.0" cat1 1 1  b "2.0" cat1 2 1  c "4.0" cat1 3 4  d "1.0" cat2 3 2  . . . . .  . . . . .  . . . . .  

например :

 lt;cat1, {2, 1} gt;  

Я написал этот код, но в функции объединения я не знаю, как я могу их посчитать.

 from mrjob.job import MRJob from mrjob.step import MRStep  class MRFrequencyCount(MRJob):   def steps(self):  return [  MRStep(  mapper=self.mapper_extract_words,  combiner=self.combine_word_counts,  )  ]   def mapper_extract(self, _, line):  (product_name, rate, category, id, version) = line.split('*')  yield category, (1, version)   def combine_counts(self, category, countAndVersion):  yield category, sum(countAndVersion)  if __name__ == '__main__':  MRFrequencyCount.run()  

Ответ №1:

Проблема в том, какой ключ вы создаете. Поскольку вы, по сути, группируетесь по категориям и версиям, вы должны отправить это в качестве составного ключа combiner функции. Затем reducer можно разбить составной ключ и выдать данные в нужном формате.

 from mrjob.job import MRJob from mrjob.step import MRStep  class MRFrequencyCount(MRJob):   def steps(self):  return [  MRStep(  mapper=self.mapper_extract,  combiner=self.combine_counts,  reducer=self.reduce_counts  )  ]   def mapper_extract(self, _, line):  (product_name, rate, category, id, version) = line.split('*')  yield (category, version), 1   def combine_counts(self, cat_version, count):  yield cat_version, sum(count)   def reduce_counts(self, cat_version, counts):  category, version = cat_version  final = sum(counts)  yield category, (final, version)  if __name__ == '__main__':  MRFrequencyCount.run()  

 a*3.0*cat1*1*1 b*2.0*cat1*2*1 c*4.0*cat1*3*4 d*1.0*cat2*3*2  

 "cat2" [1, "2"] "cat1" [1, "4"] "cat1" [2, "1"]