dimroc / etl-language-comparison

Count the number of times certain words were said in a particular neighborhood. Performed as a basic MapReduce job against 25M tweets. Implemented with different programming languages as a educational exercise.
http://blog.dimroc.com/2015/11/14/etl-language-showdown-pt3/
186 stars 33 forks source link

Alternative single-threaded python implementation (using library) #15

Closed maxgrenderjones closed 9 years ago

maxgrenderjones commented 9 years ago

For fun I implemented the task using streamutils, a python library I've been working on that makes text processing very quick to write (but single threaded and uses generators, so not particularly fast to run). Makes for pretty short code though - read it like it were some bash commands chained together:

import streamutils as su
import re

bag=su.find('tmp/tweets/tweets_*') | su.read() | su.search(r'.*?\t(.*?)\t.*?\t.*knicks.*', group=1, flags=re.IGNORECASE, match=True) | su.bag()
bag.most_common() | su.smap(lambda x: '%s\t%s\n' % (x[0], x[1])) | su.write('tmp/python_streamoutput')

Alternative version, parallelized using multiprocessing:

from multiprocessing import Pool, Queue, cpu_count
from collections import Counter
import streamutils as su
import re

def process(f):
    return su.read(fname=f) | su.search(r'.*?\t(.*?)\t.*?\t.*knicks.*', group=1, flags=re.IGNORECASE, match=True) | su.bag()

if __name__=='__main__':
    bag=Pool(cpu_count()).map(process, su.find('tmp/tweets/tweets_*')) | su.sreduce(lambda x, y: x+y, Counter())
    bag.most_common() | su.smap(lambda x: '%s\t%s\n' % (x[0], x[1])) | su.write('tmp/python_parallelstreamoutput')

Not sure if it's worthy of a pull request though - more just to show that python can be terse too :)

dimroc commented 9 years ago

Cool. How long did it take on your computer?

I've got quite a few PRs to go through so I'm all about lightening the load right now :sweat_smile:.

maxgrenderjones commented 9 years ago

Turns out the biggest issue was the regex - I was trying to do more than simply match a static string. Modified to the below, it takes 13s on a Macbook Pro.

from multiprocessing import Pool, Queue, cpu_count
from collections import Counter
import streamutils as su
import re

KNICKS=re.compile('knicks')

def process(f):
    return su.read(fname=f) | su.split(sep='\t') | su.sfilter(lambda x: KNICKS.match(x[3])) | su.smap(lambda x: x[1]) | su.bag()

if __name__=='__main__':
    bag=Pool(cpu_count()).map(process, su.find('tmp/tweets/tweets_*')) | su.sreduce(lambda x, y: x+y, Counter())
    bag.most_common() | su.smap(lambda x: '%s\t%s\n' % (x[0], x[1])) | su.write('tmp/python_parallelstreamoutput')