使用 Python 编写 MapReduce

Map 与 Reduce 为两个独立函数,为了加快各节点的处理速度,使用并行的计算方式,map 运算的结果再由 reduce 继续进行合并。例如,要统计图书馆有多少本书籍,首先一人一排进行统计(map),其次将每个人的统计结果进行汇总(reduce),最终得出总数。Hadoop 除了提供原生态的Java来编写 MapReduce 任务,还提供了其他语言操作的 API—Hadoop Streaming,它通过使用标准的输入与输出来实现 map 与 reduce 之前传递数据,映射到 Python 中便是 sys.stdin 输入数据、sys.stdout 输出数据。其他业务逻辑也直接在 Python 中编写。

下面实现一个统计文本文件(/home/test/hadoop/input.txt)中所有单词出现的词频功能,分别使用原生 Python 与框架方式来编写 mapreduce。文本文件内容如下:

hadoop/input.txt
foo foo quux labs foo bar quux abc bar see you by test welcome test
abc labs foo me python hadoop ab ac bc bec python

用原生Python编写MapReduce详解

(1) 编写 Map 代码

见下面的 mapper.py 代码,它会从标准输入(stdin)读取数据,默认以空格分割单词,然后按行输出单词及其词频到标准输出(stdout),不过整个 Map 处理过程并不会统计每个单词出现的总次数,而是直接输出 “word 1”,以便作为 Reduce 的输入进行统计,要求 mapper.py 具备可执行权限,执行 chmod +x /home/test/hadoop/mapper.py 。

hadoop/mapper.py
#!/usr/bin/env python
import sys
#输入为标准输入stdin;
for line in sys.stdin:
    #删除开头和结尾的空格;
    line = line.strip()
    #以默认空格分隔行单词到words列表;
    words = line.split()
    for word in words:
        #输出所有单词,格式为“单词,1”以便作为Reduce的输入;
        print '%s\t%s' % (word, 1)

(2) 编写 Reduce 代码

见下面的 reducer.py 代码,它会从标准输入(stdin)读取 mapper.py 的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout),要求 reducer.py 同样具备可执行权限,执行 chmod +x /home/test/hadoop/reducer.py 。

hadoop/reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None
# 获取标准输入,即mapper.py的输出;
for line in sys.stdin:
    #删除开头和结尾的空格;
    line = line.strip()
    # 解析mapper.py输出作为程序的输入,以tab作为分隔符;
    word, count = line.split('\t', 1)
    # 转换count从字符型成整型;
    try:
        count = int(count)
    except ValueError:
        # count非数字时,忽略此行;
        continue
    # 要求mapper.py的输出做排序(sort)操作,以便对连续的word做判断;
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # 输出当前word统计结果到标准输出
            print('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
# 输出最后一个word统计
if current_word == word:
    print('%s\t%s' % (current_word, current_count))

(3) 测试代码

我们可以在 Hadoop 平台运行之前在本地进行测试,校验 mapper.py 与 reducer.py 运行的结果是否正确,测试结果如图12-4所示。

image 2023 12 09 15 03 49 019
Figure 1. 图12-4 mapper执行结果(部分截图)

测试 reducer.py 时需要对 mapper.py 的输出做排序(sort)操作,当然, Hadoop 环境会自动实现排序,如图12-5所示。

image 2023 12 09 15 04 33 832
Figure 2. 图12-5 reducer执行结果

(4) 在 Hadoop 平台运行代码

首先在 HDFS 上创建文本文件存储目录,本示例中为 /user/root/word,运行命令:

# /usr/local/hadoop-1.2.1/bin/hadoop dfs -mkdir /user/root/word

上传文件至 HDFS,本示例中为 /home/test/hadoop/input.txt,如果有多个文件,可采用以下方法进行操作,因为 Hadoop 分析目标默认针对目录,目录下的文件都在运算范围中。

# /usr/local/hadoop-1.2.1/bin/hadoop fs -put /home/test/hadoop/input.txt /user/
root/word/
# /usr/local/hadoop-1.2.1/bin/hadoop dfs -ls /user/root/word/
Found 1 items
-rw-r--r--   2 root supergroup   1182014-02-10 09:49 /user/root/word/input.txt

下一步便是关键的执行 MapReduce 任务了,输出结果文件指定 /output/word,执行以下命令:

#  /usr/local/hadoop-1.2.1/bin/hadoop  jar  /usr/local/hadoop-1.2.1/contrib/        streaming/hadoop-streaming-1.2.1.jar -file ./mapper.py -mapper ./mapper.py -file
./reducer.py -reducer ./reducer.py -input /user/root/word -output /output/word

图12-6为返回的执行结果,可以看到 map 及 reduce 计算的百分比进度。

image 2023 12 09 15 07 29 506
Figure 3. 图12-6 执行MapReduce任务结果

访问 http://192.168.1.20:50030/jobtracker.jsp ,点击生成的Jobid,查看 mapreduce job 信息,如图12-7所示。

image 2023 12 09 15 08 14 168
Figure 4. 图12-7 Web查看mapreduce job信息(部分截图)

查看生成的分析结果文件清单,其中 /output/word/part-00000 为分析结果文件,如图12-8所示。

image 2023 12 09 15 08 52 145
Figure 5. 图12-8 任务输出文件清单

最后查看结果数据,图12-9显示了单词个数统计的结果,整个分析过程结束。

image 2023 12 09 15 09 23 056
Figure 6. 图12-9 查看结果文件part-00000内容

HDFS 常用操作命令有:

  1. 创建目录,示例:bin/hadoop dfs -mkdir /data/root/test。

  2. 列出目录清单,示例:bin/hadoop dfs -ls /data/root。

  3. 删除文件或目录,示例:bin/hadoop fs -rmr /data/root/test。

  4. 上传文件,示例:bin/hadoop fs -put /home/test/hadoop/*.txt /data/root/test。

  5. 查看文件内容,示例:bin/hadoop dfs -cat /output/word/part-00000。

用Mrjob框架编写MapReduce详解

Mrjob( http://pythonhosted.org/mrjob/index.html )是一个编写 MapReduce 任务的开源 Python 框架,它实际上对 Hadoop Streaming 的命令行进行了封装,因此接触不到 Hadoop 的数据流命令行,使我们可以更轻松、快速编写 MapReduce 任务。Mrjob 具有如下特点。

  1. 代码简洁,map 及 reduce 函数通过一个 Python 文件就可以搞定;

  2. 支持多步骤的 MapReduce 任务工作流;

  3. 支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;

  4. 支持亚马逊网络数据分析服务 Elastic MapReduce (EMR);

  5. 调试方便,无须任何环境支持。

安装 Mrjob 要求环境为 Python 2.5 及以上版本,源码下载地址: https://github.com/yelp/mrjob

# pip install mrjob     #PyPI安装方式
# python setup.py install     #源码安装方式

回到实现一个统计文本文件(/home/test/hadoop/input.txt)中所有单词出现的词频功能,Mrjob 通过 mapper() 与 reducer() 方法实现了 MR 操作,实现代码如下:

from mrjob.job import MRJob
class MRWordCounter(MRJob):
    def mapper(self, key, line):
        for word in line.split():
            yield word, 1
    def reducer(self, word, occurrences):
        yield word, sum(occurrences)

if __name__ == '__main__':
    MRWordCounter.run()

可以看出代码行数只是原生 Python 的 1/3,逻辑也比较清晰,代码中包含了mapper、reducer 函数。mapper 函数接收每一行的输入数据,处理后返回一对 key:value,初始化 value 为数据 1;reducer 接收 mapper 输出的 key-value 对进行整合,把相同 key 的 value 作累加(sum)操作后输出。Mrjob 利用 Python 的 yield 机制将函数变成一个Generators(生成器),通过不断调用 next() 去实现 key-value 的初始化或运算操作。前面介绍 Mrjob 支持四种运行方式,包括内嵌(-r inline)、本地(-r local)、Hadoop(-r hadoop)、Amazon EMR(-r emr),下面主要介绍前三者的运行方式。

(1) 内嵌(-r inline)方式

特点是调试方便,启动单一进程模拟任务执行状态及结果,默认(-r inline)可以省略,输出文件使用 “> output-file” 或 “-o output-file”。下面两条命令是等价的:

# python word_count.py -r inline input.txt >output.txt
# python word_count.py input.txt -o output.txt

输出文件 output.txt 内容见图12-10。

image 2023 12 09 15 17 18 640
Figure 7. 图12-10 查看输出output.txt文件内容

(2) 本地(-r local)方式

用于本地模拟 Hadoop 调试,与内嵌(inline)方式的区别是启动了多进程执行每一个任务,如:

# python word_count.py -r local input.txt >output.txt

执行的结果与 inline 一样,只是运行过程存在差异。

(3) Hadoop(-r hadoop)方式

用于 Hadoop 环境,支持 Hadoop 运行调度控制参数,如:

  • 指定 Hadoop 任务调度优先级(VERY_HIGH|HIGH),如,--jobconf mapreduce.job. priority=VERY_HIGH。

  • Map 及 Reduce 任务个数限制,如,--jobconf mapred.map.tasks=10—​jobconf mapred.reduce.tasks=5。

注意,执行之前需要指定 Hadoop 环境变量,执行结果见图12-11。

image 2023 12 09 15 20 14 904
Figure 8. 图12-11 任务执行结果(部分截图)

访问 http://192.168.1.20:50030/jobtracker.jsp ,显示的最后一行便是任务执行的信息,从中可以看到任务的优先级、map 及 reduce 的总数,如图12-12所示。

image 2023 12 09 15 20 58 574
Figure 9. 图12-12 已完成任务清单(部分截图)

查看 Hadoop 分析结果文件,内容见图12-13。

image 2023 12 09 15 21 42 320
Figure 10. 图12-13 查看任务结果文件内容

Mrjob 框架的介绍告一段落,下一节重点以实际案例进行说明。