sales_rank_mapreduce.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # -*- coding: utf-8 -*-
  2. from mrjob.job import MRJob
  3. class SalesRanker(MRJob):
  4. def within_past_week(self, timestamp):
  5. """Return True if timestamp is within past week, False otherwise."""
  6. ...
  7. def mapper(self, _, line):
  8. """Parse each log line, extract and transform relevant lines.
  9. Emit key value pairs of the form:
  10. (foo, p1), 2
  11. (bar, p1), 2
  12. (bar, p1), 1
  13. (foo, p2), 3
  14. (bar, p3), 10
  15. (foo, p4), 1
  16. """
  17. timestamp, product_id, category, quantity = line.split('\t')
  18. if self.within_past_week(timestamp):
  19. yield (category, product_id), quantity
  20. def reducer(self, key, values):
  21. """Sum values for each key.
  22. (foo, p1), 2
  23. (bar, p1), 3
  24. (foo, p2), 3
  25. (bar, p3), 10
  26. (foo, p4), 1
  27. """
  28. yield key, sum(values)
  29. def mapper_sort(self, key, value):
  30. """Construct key to ensure proper sorting.
  31. Transform key and value to the form:
  32. (foo, 2), p1
  33. (bar, 3), p1
  34. (foo, 3), p2
  35. (bar, 10), p3
  36. (foo, 1), p4
  37. The shuffle/sort step of MapReduce will then do a
  38. distributed sort on the keys, resulting in:
  39. (category1, 1), product4
  40. (category1, 2), product1
  41. (category1, 3), product2
  42. (category2, 3), product1
  43. (category2, 7), product3
  44. """
  45. category, product_id = key
  46. quantity = value
  47. yield (category, quantity), product_id
  48. def reducer_identity(self, key, value):
  49. yield key, value
  50. def steps(self):
  51. """Run the map and reduce steps."""
  52. return [
  53. self.mr(mapper=self.mapper,
  54. reducer=self.reducer),
  55. self.mr(mapper=self.mapper_sort,
  56. reducer=self.reducer_identity),
  57. ]
  58. if __name__ == '__main__':
  59. SalesRanker.run()