1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- # -*- coding: utf-8 -*-
- from mrjob.job import MRJob
- class SalesRanker(MRJob):
- def within_past_week(self, timestamp):
- """Return True if timestamp is within past week, False otherwise."""
- ...
- def mapper(self, _, line):
- """Parse each log line, extract and transform relevant lines.
- Emit key value pairs of the form:
- (foo, p1), 2
- (bar, p1), 2
- (bar, p1), 1
- (foo, p2), 3
- (bar, p3), 10
- (foo, p4), 1
- """
- timestamp, product_id, category, quantity = line.split('\t')
- if self.within_past_week(timestamp):
- yield (category, product_id), quantity
- def reducer(self, key, values):
- """Sum values for each key.
- (foo, p1), 2
- (bar, p1), 3
- (foo, p2), 3
- (bar, p3), 10
- (foo, p4), 1
- """
- yield key, sum(values)
- def mapper_sort(self, key, value):
- """Construct key to ensure proper sorting.
- Transform key and value to the form:
- (foo, 2), p1
- (bar, 3), p1
- (foo, 3), p2
- (bar, 10), p3
- (foo, 1), p4
- The shuffle/sort step of MapReduce will then do a
- distributed sort on the keys, resulting in:
- (category1, 1), product4
- (category1, 2), product1
- (category1, 3), product2
- (category2, 3), product1
- (category2, 7), product3
- """
- category, product_id = key
- quantity = value
- yield (category, quantity), product_id
- def reducer_identity(self, key, value):
- yield key, value
- def steps(self):
- """Run the map and reduce steps."""
- return [
- self.mr(mapper=self.mapper,
- reducer=self.reducer),
- self.mr(mapper=self.mapper_sort,
- reducer=self.reducer_identity),
- ]
- if __name__ == '__main__':
- SalesRanker.run()
|