Skip to content
gtoonstra edited this page Jun 29, 2015 · 6 revisions

The remap project contains some examples of source code in the examples directory. They demonstrate different functions of the platform.

Wordcount

Word count is the obligatory, minimal function of a map/reduce platform. It reads in a couple of files line by line, splits phrases into words and 'emits' a single tuple of the combination of the word plus a "1".

The example implementation has some opportunities for performance improvements, for example by using a combiner, but this was left out to demonstrate the bare bones minimum that a certain project should have.

When an implementation is started, it loads the main module file (configured in appconfig.json in the same directory). This module is responsible for creating the necessary objects for reading and writing files and how the pipeline gets configured. The objects returned get used in the algorithm implementation automatically and the module then gets called for the 'map' or 'reduce' algorithm as required.

The 'remap' module in 'src/core' contains a couple of commonly used readers and writers. Obviously, you can create your own readers and writers in the module code itself, or do this in a separate module on the side.

Contrary to the Hadoop map reduce, the map function in remap does partitioning inline, so the algorithm for that is part of the map code.

For both map and reduce functions, the 'key' must be immutable and hashable. The value can contain theoretically anything. It's just that in this wordcount example, the elements are usually strings. It depends on the implementation of the partitioners, writers and readers what the kind of value objects are that you move around.

import remap

# --- create file i/o objects to be used ----
def create_mapper_reader( filename ):
    return remap.TextFileReader( filename )

def create_mapper_partitioner( outputdir, partition, mapperid ):
    return remap.TextPartitioner( outputdir, partition, mapperid )

def create_reducer_reader( inputdir ):
    return remap.TextPartFileReader( inputdir )

def create_reducer_writer( outputdir, partition ):
    return remap.TextReduceWriter( outputdir, partition )

# ---- map and reduce implementations ----

# map just creates one record of the word and a '1' to count it,
# it also directs the mapped value to a specific partition
def map( key, value ):
    remove = ".,?:;!\""
    trans = str.maketrans(remove, ' ' * len(remove))

    words = value.translate( trans ).split()
    for word in words:
        # remove comma's, they create issues for our file format
        word = word.lower()
        if word[0] in 'abcde':
            yield 'a2e', word, 1
        elif word[0] in 'fghijklmn':
            yield 'f2n', word, 1
        elif word[0] in 'opqrs':
            yield 'o2s', word, 1
        elif word[0] in 'tuvwxyz':
            yield 't2z', word, 1
        else:
            yield '_default', word, 1

# The reduce operation sums all the values in the sequence and outputs.
def reduce( key, list_of_values ):
    yield (key, str(sum(list_of_values)))

Input: Simple text files

..... etc...

It is the devout desire of this translator to hasten the day when the
story of Beowulf shall be as familiar to English-speaking peoples as that
of the Iliad. Beowulf is our first great epic. It is an epitomized history
of the life of the Teutonic races. It brings vividly before us our
forefathers of pre-Alfredian eras, in their love of war, of sea, and of
adventure. 

..... etc...

Output: a fully sorted list of words across all partitions and their frequency, partitioned into one file per partition.

bureau,1
burglar's,2
burglars,1
burgundy,5
burial,1
burial-barrow,1
burial-place,1
buried,20
burla,5
burlaba,1
burlada,1
burlado,2
burlados,1
burlar,1

##Collation

Collation is the typical "page indexing" problem or the 'inverted index' implementation. For a large number of HTML files, each file (file-id) contains a bag of words, so there's a collection of key,value pairs with key as the file-id and value a word in the document.

In the inverted index, you now want to invert this, such that one word contains a list of file-ids. The collation can help in this. It reads a couple of HTML files, then produces new key,value pairs.

Notice here:

  • It uses a different reader for the mapper function, which processes HTML documents
  • the mapper partitioner uses a combiner, which reduces all occurrences of the same value per key to one
  • the mapper is almost the same as in wordcount, but in this case it does emit the key as the value
  • the reducer uses the same combiner

Code:

import remap

# --- create file i/o objects to be used ----
def create_mapper_reader( filename ):
    return remap.HTMLFileReader( filename )

def create_mapper_partitioner( outputdir, partition, mapperid ):
    return remap.TextPartitioner( outputdir, partition, mapperid, combiner=list_combiner )

.....

def list_combiner( l ):
    return list(set(l))

def map( key, value ):
    remove = ".,?:;!\""
    trans = str.maketrans(remove, ' ' * len(remove))

    words = value.translate( trans ).split()
    for word in words:
        # remove comma's, they create issues for our file format
        word = word.lower()
        if word[0] in 'abcde':
            yield 'a2e', word, key
        elif word[0] in 'fghijklmn':
            yield 'f2n', word, key
        elif word[0] in 'opqrs':
            yield 'o2s', word, key
        elif word[0] in 'tuvwxyz':
            yield 't2z', word, key
        else:
            yield '_default', word, key

# The reduce operation sums all the values in the sequence and outputs.
def reduce( key, list_of_values ):
    yield (key, list_combiner(list_of_values))

Input: HTML files

<html><head><title>agasgasg</title></head>
<body>
 <div class="section" id="module-xml.etree.ElementTree">
<span id="xml-etree-elementtree-the-elementtree-xml-api"></span><h1>19.7. 
<a class="reference internal" href="#module-xml.etree.ElementTree" title="xml.etree.ElementTree: Implementation of the ElementTree API.">
<tt class="xref py py-mod docutils literal"><span class="pre">xml.etree.ElementTree</span></tt>
</a> — The ElementTree XML API
<a class="headerlink" href="#module-xml.etree.ElementTree" title="Permalink to this headline">
</a></h1>
<div class="versionadded">
<p><span class="versionmodified">New in version 2.5.</span></p>
</div>
</body></html>

Output: a fully sorted list (per word 'key') with a document id as the value.

fredrik,['/home/gt/remap/data/html/elemtree.html']
from,['/home/gt/remap/data/html/elemtree.html', '/home/gt/remap/data/html/remap-design.html']
full,['/home/gt/remap/data/html/elemtree.html']

##Secondary sort

Map/Reduce works very well if you have a single key to sort the values with. Sometimes however, it's necessary to sort the data to a combinatorial key, which is called 'secondary sort'. You can define your own comparison functions to achieve this.

This example reads in a CSV file of values and sorts the output by key. A sorting task typically only requires a 'mapper' stage. The sort key here is the third column.

import remap
from operator import itemgetter

# --- create file i/o objects to be used ----
def create_mapper_reader( filename ):
    return remap.TextFileReader( filename )

def create_mapper_partitioner( outputdir, partition, mapperid ):
    return remap.TextPartitioner( outputdir, partition, mapperid, combiner=None, customkey=itemgetter(3) )

...etc...

# ---- map and reduce implementations ----
def map( key, value ):
    words = value.split(',')
    part = words[2].replace(" ", "_")
    yield part, tuple(words), ""

def reduce( key, value ):
    yield (key, value)

Input: A CSV file with insurance information

352792,FL,CLAY COUNTY,0,366285.62,0,0,366285.62,507164.19,0,0,0,0,30.08012,-81.718452,Residential,Masonry,1
717603,FL,CLAY COUNTY,0,22512.61,0,0,22512.61,28637.17,0,0,0,0,30.08012,-81.718452,Residential,Wood,1
937659,FL,SUWANNEE COUNTY,0,9246.6,0,9246.6,9246.6,10880.22,0,0,0,0,29.959805,-82.926659,Residential,Wood,3
294022,FL,SUWANNEE COUNTY,0,96164.64,0,0,96164.64,69357.78,0,0,0,0,29.959805,-82.926659,Residential,Wood,3
410500,FL,SUWANNEE COUNTY,0,11095.92,0,0,11095.92,12737.89,0,0,0,0,29.959805,-82.926659,Residential,Wood,3

Output: A file of tuples, partitioned by county, sorted by the third column

('985128', 'FL', 'CLAY COUNTY', '0', '128341.04', '0', '0', '128341.04', '135476.81', '0', '0', '0', '0', '30.175193', '-81.697738', 'Residential', 'Wood', '4\n'),[""]
('644183', 'FL', 'CLAY COUNTY', '0', '28869.12', '0', '0', '28869.12', '34013.3', '0', '0', '0', '0', '30.160364', '-81.864807', 'Residential', 'Wood', '1\n'),[""]
('752123', 'FL', 'CLAY COUNTY', '0', '13786.29', '0', '0', '13786.29', '18320.32', '0', '0', '0', '0', '29.9826', '-81.7238', 'Residential', 'Wood', '4\n'),[""]
('881403', 'FL', 'CLAY COUNTY', '10935', '10935', '10935', '10935', '10935', '13790.17', '0', '0', '0', '0', '29.7735', '-82.05159', 'Residential', 'Wood', '1\n'),[""]
('248341', 'FL', 'CLAY COUNTY', '117000', '117000', '117000', '117000', '117000', '86913.22', '0', '0', '0', '0', '30.18133', '-81.74089', 'Residential', 'Wood', '1\n'),[""]
('280674', 'FL', 'CLAY COUNTY', '121950', '121950', '121950', '121950', '121950', '189305.42', '0', '0', '0', '0', '30.18241', '-81.74075', 'Residential', 'Wood', '1\n'),[""]
('447682', 'FL', 'CLAY COUNTY', '125233.2', '125233.2', '125233.2', '125233.2', '125233.2', '149904.14', '0', '0', '0', '0', '30.166', '-81.73652', 'Residential', 'Wood', '1\n'),[""]
('448094', 'FL', 'CLAY COUNTY', '1322376.3', '1322376.3', '1322376.3', '1322376.3', '1322376.3', '1438163.57', '0', '0', '0', '0', '30.063936', '-81.707664', 'Residential', 'Masonry', '3\n'),[""]

##Graph processing

You can start a vertex job for graph processing. See the highest or pagerank examples for that.

Clone this wiki locally