123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- # -*- coding: utf-8 -*-
- from mrjob.job import MRJob
- class SpendingByCategory(MRJob):
- def __init__(self, categorizer):
- self.categorizer = categorizer
- ...
- def current_year_month(self):
- """Return the current year and month."""
- ...
- def extract_year_month(self, timestamp):
- """Return the year and month portions of the timestamp."""
- ...
- def handle_budget_notifications(self, key, total):
- """Call notification API if nearing or exceeded budget."""
- ...
- def mapper(self, _, line):
- """Parse each log line, extract and transform relevant lines.
- Emit key value pairs of the form:
- (2016-01, shopping), 25
- (2016-01, shopping), 100
- (2016-01, gas), 50
- """
- timestamp, category, amount = line.split('\t')
- period = self. extract_year_month(timestamp)
- if period == self.current_year_month():
- yield (period, category), amount
- def reducer(self, key, values):
- """Sum values for each key.
- (2016-01, shopping), 125
- (2016-01, gas), 50
- """
- total = sum(values)
- self.handle_budget_notifications(key, total)
- yield key, sum(values)
- def steps(self):
- """Run the map and reduce steps."""
- return [
- self.mr(mapper=self.mapper,
- reducer=self.reducer)
- ]
- if __name__ == '__main__':
- SpendingByCategory.run()
|