12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- # -*- coding: utf-8 -*-
- from mrjob.job import MRJob
- class HitCounts(MRJob):
- def extract_url(self, line):
- """Extract the generated url from the log line."""
- pass
- def extract_year_month(self, line):
- """Return the year and month portions of the timestamp."""
- pass
- def mapper(self, _, line):
- """Parse each log line, extract and transform relevant lines.
- Emit key value pairs of the form:
- (2016-01, url0), 1
- (2016-01, url0), 1
- (2016-01, url1), 1
- """
- url = self.extract_url(line)
- period = self.extract_year_month(line)
- yield (period, url), 1
- def reducer(self, key, values):
- """Sum values for each key.
- (2016-01, url0), 2
- (2016-01, url1), 1
- """
- yield key, sum(values)
- def steps(self):
- """Run the map and reduce steps."""
- return [
- self.mr(mapper=self.mapper,
- reducer=self.reducer)
- ]
- if __name__ == '__main__':
- HitCounts.run()
|