diff --git a/docs/15.大数据与MapReduce.md b/docs/15.大数据与MapReduce.md index cad066ba..08afb24e 100644 --- a/docs/15.大数据与MapReduce.md +++ b/docs/15.大数据与MapReduce.md @@ -10,14 +10,14 @@ ## 大数据 场景 ``` -假如你为一家网络购物商店工作,很多拥护访问该网站,其中有些人会购买商品,有些人则随意浏览后就离开。 +假如你为一家网络购物商店工作,很多用户访问该网站,其中有些人会购买商品,有些人则随意浏览后就离开。 对于你来说,可能很想识别那些有购物意愿的用户。 那么问题就来了,数据集可能会非常大,在单机上训练要运行好几天。 -接下来:我们讲讲 Hadoop 如何来解决这样的问题 +接下来:我们讲讲 MapRedece 如何来解决这样的问题 ``` -## MapReduce +## MapRedece ### Hadoop 概述 @@ -79,21 +79,21 @@ cat input/15.BigData_MapReduce/inputFile.txt | python src/python/15.BigData_MapR #### Mahout in Action -1. 简单贝叶斯: -2. k-近邻算法: +1. 简单贝叶斯:它属于为数不多的可以很自然的使用MapReduce的算法。通过统计在某个类别下某特征的概率。 +2. k-近邻算法:高维数据下(如文本、图像和视频)流行的近邻查找方法是局部敏感哈希算法。 3. 支持向量机(SVM):使用随机梯度下降算法求解,如Pegasos算法。 4. 奇异值分解:Lanczos算法是一个有效的求解近似特征值的算法。 5. k-均值聚类:canopy算法初始化k个簇,然后再运行K-均值求解结果。 -#### 使用 mrjob 库将 MapReduce 自动化 +### 使用 mrjob 库将 MapReduce 自动化 > 理论简介 -* MapReduce作业流自动化的框架:Cascading 和 Oozie. -* mrjob是一个不错的学习工具,与2010年底实现了开源,来之于Yelp(一个餐厅点评网站). +* MapReduce 作业流自动化的框架:Cascading 和 Oozie. +* mrjob 是一个不错的学习工具,与2010年底实现了开源,来之于 Yelp(一个餐厅点评网站). ```Shell -python mrMean.py < inputFile.txt > myOut.txt +python src/python/15.BigData_MapReduce/mrMean.py < input/15.BigData_MapReduce/inputFile.txt > input/15.BigData_MapReduce/myOut.txt ``` > 实战脚本 @@ -106,11 +106,11 @@ python mrMean.py < inputFile.txt > myOut.txt python src/python/15.BigData_MapReduce/mrMean.py < input/15.BigData_MapReduce/inputFile.txt ``` -#### 利用 Pegasos 算法并行训练支持向量机 +### 项目案例:分布式 SVM 的 Pegasos 算法 Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver) -> Pegasos 工作原理 +#### Pegasos 工作原理 1. 从训练集中随机挑选一些样本点添加到带处理列表中 2. 按序判断每个样本点是否被正确分类 @@ -130,7 +130,7 @@ Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver) 累加对 w 的更新 ``` -> 开发流程 +#### 开发流程 ``` 收集数据:数据按文本格式存放。 @@ -141,6 +141,11 @@ Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver) 使用算法:本例不会展示一个完整的应用,但会展示如何在大数据集上训练SVM。该算法其中一个应用场景就是本文分类,通常在文本分类里可能有大量的文档和成千上万的特征。 ``` +> 训练算法 + +[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py): + + 我们继续看 Python 版本的代码实现。 * * * diff --git a/images/15.BigData_MapReduce/mr_1_cluster.jpg b/images/15.BigData_MapReduce/mr_1_cluster.jpg index 6bf55a58..c4047cbc 100644 Binary files a/images/15.BigData_MapReduce/mr_1_cluster.jpg and b/images/15.BigData_MapReduce/mr_1_cluster.jpg differ diff --git a/input/15.BigData_MapReduce/myout.txt b/input/15.BigData_MapReduce/myout.txt index 2aa78fba..3dc35ffc 100644 --- a/input/15.BigData_MapReduce/myout.txt +++ b/input/15.BigData_MapReduce/myout.txt @@ -1,303 +1 @@ -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 79] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 115] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 107] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 109] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 109] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 88] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 56] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 94] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 50] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 86] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 75] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 30] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 20] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 157] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 15] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 19] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 63] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 124] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 132] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 3] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 140] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 139] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 127] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 98] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 30] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 16] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 4] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 2] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 75] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 123] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 42] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 16] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 94] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 163] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 159] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 23] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 16] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 160] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 5] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 42] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 53] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 83] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 46] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 121] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 73] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 123] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 93] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 99] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 106] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 173] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 192] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 132] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 57] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 47] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 164] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 157] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 199] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 62] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 175] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 154] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 110] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 0] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 116] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 49] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 76] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 121] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 178] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 75] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 167] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 41] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 105] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 71] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 5] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 135] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 80] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 116] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 198] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 164] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 105] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 98] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 156] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 72] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 54] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 62] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 57] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 87] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 68] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 163] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 140] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 40] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 70] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 120] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 172] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 71] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 82] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 168] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 42] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 144] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 27] -shape(self.w): (2,) -shape(X[index,:]) (1, 2) -1 ["u", 36] -in the map_fin -1 ["w", [0.001, 0.001]] -1 ["t", 1] +0.5095697 0.08477803392127012 diff --git a/src/python/15.BigData_MapReduce/mrSVM.py b/src/python/15.BigData_MapReduce/mrSVM.py index 9186b8b6..6493795d 100644 --- a/src/python/15.BigData_MapReduce/mrSVM.py +++ b/src/python/15.BigData_MapReduce/mrSVM.py @@ -1,3 +1,5 @@ +#!/usr/bin/python +# coding:utf8 ''' Created on 2017-04-07 MapReduce version of Pegasos SVM @@ -9,19 +11,20 @@ from mrjob.job import MRJob import pickle from numpy import * + class MRsvm(MRJob): DEFAULT_INPUT_PROTOCOL = 'json_value' - + def __init__(self, *args, **kwargs): super(MRsvm, self).__init__(*args, **kwargs) - self.data = pickle.load(open('C:\Users\Peter\machinelearninginaction\Ch15\svmDat27')) + self.data = pickle.load(open('input/15.BigData_MapReduce/svmDat27')) self.w = 0 self.eta = 0.69 self.dataList = [] self.k = self.options.batchsize self.numMappers = 1 - self.t = 1 #iteration number - + self.t = 1 # iteration number + def configure_options(self): super(MRsvm, self).configure_options() self.add_passthrough_option( @@ -30,49 +33,61 @@ class MRsvm(MRJob): self.add_passthrough_option( '--batchsize', dest='batchsize', default=100, type='int', help='k: number of data points in a batch') - - def map(self, mapperId, inVals): # 需要 2 个参数 - #input: nodeId, ('w', w-vector) OR nodeId, ('x', int) - if False: yield - if inVals[0]=='w': # 积累 w向量 + + def map(self, mapperId, inVals): # 需要 2 个参数 + # input: nodeId, ('w', w-vector) OR nodeId, ('x', int) + if False: + yield + if inVals[0] == 'w': # 积累 w向量 self.w = inVals[1] - elif inVals[0]=='x': - self.dataList.append(inVals[1])# 累积数据点计算 - elif inVals[0]=='t': self.t = inVals[1] - else: self.eta=inVals # 这用于 debug, eta未在map中使用 - + elif inVals[0] == 'x': + self.dataList.append(inVals[1]) # 累积数据点计算 + elif inVals[0] == 't': + self.t = inVals[1] + else: + self.eta = inVals # 这用于 debug, eta未在map中使用 + def map_fin(self): - labels = self.data[:,-1]; X=self.data[:,0:-1]# 将数据重新形成 X 和 Y - if self.w == 0: self.w = [0.001]*shape(X)[1] # 在第一次迭代时,初始化 w + labels = self.data[:,-1] + X = self.data[:, 0:-1] # 将数据重新形成 X 和 Y + if self.w == 0: + self.w = [0.001] * shape(X)[1] # 在第一次迭代时,初始化 w for index in self.dataList: - p = mat(self.w)*X[index,:].T #calc p=w*dataSet[key].T + p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T if labels[index]*p < 1.0: - yield (1, ['u', index])# 确保一切数据包含相同的key - yield (1, ['w', self.w]) # 它们将在同一个 reducer + yield (1, ['u', index]) # 确保一切数据包含相同的key + yield (1, ['w', self.w]) # 它们将在同一个 reducer yield (1, ['t', self.t]) def reduce(self, _, packedVals): - for valArr in packedVals: # 从流输入获取值 - if valArr[0]=='u': self.dataList.append(valArr[1]) - elif valArr[0]=='w': self.w = valArr[1] - elif valArr[0]=='t': self.t = valArr[1] - labels = self.data[:,-1]; X=self.data[:,0:-1] - wMat = mat(self.w); wDelta = mat(zeros(len(self.w))) + for valArr in packedVals: # 从流输入获取值 + if valArr[0] == 'u': + self.dataList.append(valArr[1]) + elif valArr[0] == 'w': + self.w = valArr[1] + elif valArr[0] == 't': + self.t = valArr[1] + + labels = self.data[:, -1] + X = self.data[:, 0:-1] + wMat = mat(self.w) + wDelta = mat(zeros(len(self.w))) + for index in self.dataList: - wDelta += float(labels[index])*X[index,:] #wDelta += label*dataSet - eta = 1.0/(2.0*self.t) #calc new: eta - #calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta + wDelta += float(labels[index]) * X[index, :] # wDelta += label*dataSet + eta = 1.0/(2.0*self.t) # calc new: eta + # calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta wMat = (1.0 - 1.0/self.t)*wMat + (eta/self.k)*wDelta - for mapperNum in range(1,self.numMappers+1): - yield (mapperNum, ['w', wMat.tolist()[0] ]) #发出 w + for mapperNum in range(1, self.numMappers+1): + yield (mapperNum, ['w', wMat.tolist()[0]]) # 发出 w if self.t < self.options.iterations: - yield (mapperNum, ['t', self.t+1])# 增量 T - for j in range(self.k/self.numMappers):#emit random ints for mappers iid - yield (mapperNum, ['x', random.randint(shape(self.data)[0]) ]) - + yield (mapperNum, ['t', self.t+1]) # 增量 T + for j in range(self.k/self.numMappers): # emit random ints for mappers iid + yield (mapperNum, ['x', random.randint(shape(self.data)[0])]) + def steps(self): - return ([self.mr(mapper=self.map, reducer=self.reduce, - mapper_final=self.map_fin)]*self.options.iterations) + return ([self.mr(mapper=self.map, reducer=self.reduce, mapper_final=self.map_fin)] * self.options.iterations) + if __name__ == '__main__': MRsvm.run() diff --git a/src/python/15.BigData_MapReduce/pegasos.py b/src/python/15.BigData_MapReduce/pegasos.py index 51df16b8..3d07cefe 100644 --- a/src/python/15.BigData_MapReduce/pegasos.py +++ b/src/python/15.BigData_MapReduce/pegasos.py @@ -1,12 +1,14 @@ +#!/usr/bin/python +# coding:utf8 ''' Created on 2017-04-07 Sequential Pegasos the input T is k*T in Batch Pegasos @author: Peter/ApacheCN-xy ''' - from numpy import * + def loadDataSet(fileName): dataMat = []; labelMat = [] fr = open(fileName) @@ -17,6 +19,7 @@ def loadDataSet(fileName): labelMat.append(float(lineArr[2])) return dataMat,labelMat + def seqPegasos(dataSet, labels, lam, T): m,n = shape(dataSet); w = zeros(n) for t in range(1, T+1): @@ -29,10 +32,12 @@ def seqPegasos(dataSet, labels, lam, T): w = (1.0 - 1/t)*w print w return w - + + def predict(w, x): return w*x.T + def batchPegasos(dataSet, labels, lam, T, k): m,n = shape(dataSet); w = zeros(n); dataIndex = range(m) @@ -48,6 +53,8 @@ def batchPegasos(dataSet, labels, lam, T, k): w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改 return w + + datArr,labelList = loadDataSet('testSet.txt') datMat = mat(datArr) #finalWs = seqPegasos(datMat, labelList, 2, 5000) @@ -74,4 +81,4 @@ ax.plot(x,y) ax.plot(x,y2,'g-.') ax.axis([-6,8,-4,5]) ax.legend(('50 Iterations', '2 Iterations') ) -plt.show() \ No newline at end of file +plt.show() diff --git a/src/python/15.BigData_MapReduce/wc.py b/src/python/15.BigData_MapReduce/wc.py index 5df2786a..02e05e48 100644 --- a/src/python/15.BigData_MapReduce/wc.py +++ b/src/python/15.BigData_MapReduce/wc.py @@ -1,5 +1,6 @@ +#!/usr/bin/python +# coding:utf8 from mrjob.job import MRJob -import json class MRWordCountUtility(MRJob):