mint_mapreduce.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # -*- coding: utf-8 -*-
  2. from mrjob.job import MRJob
  3. class SpendingByCategory(MRJob):
  4. def __init__(self, categorizer):
  5. self.categorizer = categorizer
  6. ...
  7. def current_year_month(self):
  8. """Return the current year and month."""
  9. ...
  10. def extract_year_month(self, timestamp):
  11. """Return the year and month portions of the timestamp."""
  12. ...
  13. def handle_budget_notifications(self, key, total):
  14. """Call notification API if nearing or exceeded budget."""
  15. ...
  16. def mapper(self, _, line):
  17. """Parse each log line, extract and transform relevant lines.
  18. Emit key value pairs of the form:
  19. (2016-01, shopping), 25
  20. (2016-01, shopping), 100
  21. (2016-01, gas), 50
  22. """
  23. timestamp, category, amount = line.split('\t')
  24. period = self. extract_year_month(timestamp)
  25. if period == self.current_year_month():
  26. yield (period, category), amount
  27. def reducer(self, key, values):
  28. """Sum values for each key.
  29. (2016-01, shopping), 125
  30. (2016-01, gas), 50
  31. """
  32. total = sum(values)
  33. self.handle_budget_notifications(key, total)
  34. yield key, sum(values)
  35. def steps(self):
  36. """Run the map and reduce steps."""
  37. return [
  38. self.mr(mapper=self.mapper,
  39. reducer=self.reducer)
  40. ]
  41. if __name__ == '__main__':
  42. SpendingByCategory.run()