diff --git a/docs/15.大数据与MapReduce.md b/docs/15.大数据与MapReduce.md index 08afb24e..83e4cc5e 100644 --- a/docs/15.大数据与MapReduce.md +++ b/docs/15.大数据与MapReduce.md @@ -143,10 +143,47 @@ Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver) > 训练算法 -[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py): +```python +def batchPegasos(dataSet, labels, lam, T, k): + """batchPegasos() + Args: + dataMat 特征集合 + labels 分类结果集合 + lam 固定值,微调的空间 + T 迭代次数 + k 待处理列表大小 + Returns: + w 权重向量 + """ + m, n = shape(dataSet) + w = zeros(n) + dataIndex = range(m) + for t in range(1, T+1): + wDelta = mat(zeros(n)) # 重置 wDelta -我们继续看 Python 版本的代码实现。 + # 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长) + # 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta + eta = 1.0/(lam*t) + random.shuffle(dataIndex) + for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量 + i = dataIndex[j] + p = predict(w, dataSet[i, :]) # mapper 代码 + + # 如果预测正确,并且预测结果的绝对值>=1, 认为没问题。 + # 否则算是预测错误, 通过预测错误的结果,来累计更新w. + if labels[i]*p < 1: # mapper 代码 + wDelta += labels[i]*dataSet[i, :].A # 累积变化 + # w通过不断的随机梯度的方式来优化 + w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改 + # print '-----', w + # print '++++++', w + return w +``` + +[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/pegasos.py): + +[MR版本的代码位置](https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/mrSVM.py): * * * diff --git a/src/python/15.BigData_MapReduce/mrSVM.py b/src/python/15.BigData_MapReduce/mrSVM.py index 6493795d..f7a7c302 100644 --- a/src/python/15.BigData_MapReduce/mrSVM.py +++ b/src/python/15.BigData_MapReduce/mrSVM.py @@ -2,9 +2,11 @@ # coding:utf8 ''' Created on 2017-04-07 +Update on 2017-06-20 MapReduce version of Pegasos SVM Using mrjob to automate job flow -@author: Peter/ApacheCN-xy +@author: Peter/ApacheCN-xy/片刻 +《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning ''' from mrjob.job import MRJob @@ -17,14 +19,14 @@ class MRsvm(MRJob): def __init__(self, *args, **kwargs): super(MRsvm, self).__init__(*args, **kwargs) - self.data = pickle.load(open('input/15.BigData_MapReduce/svmDat27')) + self.data = pickle.load(open('/opt/git/MachineLearning/input/15.BigData_MapReduce/svmDat27')) self.w = 0 self.eta = 0.69 self.dataList = [] self.k = self.options.batchsize self.numMappers = 1 self.t = 1 # iteration number - + def configure_options(self): super(MRsvm, self).configure_options() self.add_passthrough_option( @@ -42,20 +44,20 @@ class MRsvm(MRJob): self.w = inVals[1] elif inVals[0] == 'x': self.dataList.append(inVals[1]) # 累积数据点计算 - elif inVals[0] == 't': + elif inVals[0] == 't': # 迭代次数 self.t = inVals[1] else: - self.eta = inVals # 这用于 debug, eta未在map中使用 + self.eta = inVals # 这用于 debug, eta未在map中使用 def map_fin(self): - labels = self.data[:,-1] - X = self.data[:, 0:-1] # 将数据重新形成 X 和 Y - if self.w == 0: + labels = self.data[:, -1] + X = self.data[:, :-1] # 将数据重新形成 X 和 Y + if self.w == 0: self.w = [0.001] * shape(X)[1] # 在第一次迭代时,初始化 w for index in self.dataList: - p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T + p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T if labels[index]*p < 1.0: - yield (1, ['u', index]) # 确保一切数据包含相同的key + yield (1, ['u', index]) # 确保一切数据包含相同的key yield (1, ['w', self.w]) # 它们将在同一个 reducer yield (1, ['t', self.t]) @@ -66,7 +68,7 @@ class MRsvm(MRJob): elif valArr[0] == 'w': self.w = valArr[1] elif valArr[0] == 't': - self.t = valArr[1] + self.t = valArr[1] labels = self.data[:, -1] X = self.data[:, 0:-1] diff --git a/src/python/15.BigData_MapReduce/pegasos.py b/src/python/15.BigData_MapReduce/pegasos.py index 3d07cefe..eaa05d49 100644 --- a/src/python/15.BigData_MapReduce/pegasos.py +++ b/src/python/15.BigData_MapReduce/pegasos.py @@ -10,24 +10,26 @@ from numpy import * def loadDataSet(fileName): - dataMat = []; labelMat = [] + dataMat = [] + labelMat = [] fr = open(fileName) for line in fr.readlines(): lineArr = line.strip().split('\t') - #dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])]) + # dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])]) dataMat.append([float(lineArr[0]), float(lineArr[1])]) labelMat.append(float(lineArr[2])) - return dataMat,labelMat + return dataMat, labelMat def seqPegasos(dataSet, labels, lam, T): - m,n = shape(dataSet); w = zeros(n) + m, n = shape(dataSet) + w = zeros(n) for t in range(1, T+1): i = random.randint(m) eta = 1.0/(lam*t) - p = predict(w, dataSet[i,:]) + p = predict(w, dataSet[i, :]) if labels[i]*p < 1: - w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i,:] + w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i, :] else: w = (1.0 - 1/t)*w print w @@ -35,29 +37,49 @@ def seqPegasos(dataSet, labels, lam, T): def predict(w, x): - return w*x.T + return w*x.T # 就是预测 y 的值 def batchPegasos(dataSet, labels, lam, T, k): - m,n = shape(dataSet); w = zeros(n); + """batchPegasos() + + Args: + dataMat 特征集合 + labels 分类结果集合 + lam 固定值,微调的空间 + T 迭代次数 + k 待处理列表大小 + Returns: + w 权重向量 + """ + m, n = shape(dataSet) + w = zeros(n) dataIndex = range(m) for t in range(1, T+1): - wDelta = mat(zeros(n)) # 重置 wDelta + wDelta = mat(zeros(n)) # 重置 wDelta + + # 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长) + # 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta eta = 1.0/(lam*t) random.shuffle(dataIndex) - for j in range(k):# 全部的训练集 + for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量 i = dataIndex[j] - p = predict(w, dataSet[i,:]) # mapper 代码 - if labels[i]*p < 1: # mapper 代码 - wDelta += labels[i]*dataSet[i,:].A # 累积变化 - w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改 + p = predict(w, dataSet[i, :]) # mapper 代码 + + # 如果预测正确,并且预测结果的绝对值>=1, 认为没问题。 + # 否则算是预测错误, 通过预测错误的结果,来累计更新w. + if labels[i]*p < 1: # mapper 代码 + wDelta += labels[i]*dataSet[i, :].A # 累积变化 + # w通过不断的随机梯度的方式来优化 + w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改 + # print '-----', w + # print '++++++', w return w - -datArr,labelList = loadDataSet('testSet.txt') +datArr, labelList = loadDataSet('input/15.BigData_MapReduce/testSet.txt') datMat = mat(datArr) -#finalWs = seqPegasos(datMat, labelList, 2, 5000) +# finalWs = seqPegasos(datMat, labelList, 2, 5000) finalWs = batchPegasos(datMat, labelList, 2, 50, 100) print finalWs @@ -65,20 +87,25 @@ import matplotlib import matplotlib.pyplot as plt fig = plt.figure() ax = fig.add_subplot(111) -x1=[]; y1=[]; xm1=[]; ym1=[] +x1 = [] +y1 = [] +xm1 = [] +ym1 = [] for i in range(len(labelList)): if labelList[i] == 1.0: - x1.append(datMat[i,0]); y1.append(datMat[i,1]) + x1.append(datMat[i, 0]) + y1.append(datMat[i, 1]) else: - xm1.append(datMat[i,0]); ym1.append(datMat[i,1]) + xm1.append(datMat[i, 0]) + ym1.append(datMat[i, 1]) ax.scatter(x1, y1, marker='s', s=90) ax.scatter(xm1, ym1, marker='o', s=50, c='red') x = arange(-6.0, 8.0, 0.1) -y = (-finalWs[0,0]*x - 0)/finalWs[0,1] -#y2 = (0.43799*x)/0.12316 -y2 = (0.498442*x)/0.092387 #2 iterations -ax.plot(x,y) -ax.plot(x,y2,'g-.') -ax.axis([-6,8,-4,5]) -ax.legend(('50 Iterations', '2 Iterations') ) +y = (-finalWs[0, 0]*x - 0)/finalWs[0, 1] +# y2 = (0.43799*x)/0.12316 +y2 = (0.498442*x)/0.092387 # 2 iterations +ax.plot(x, y) +ax.plot(x, y2, 'g-.') +ax.axis([-6, 8, -4, 5]) +ax.legend(('50 Iterations', '2 Iterations')) plt.show() diff --git a/src/python/15.BigData_MapReduce/proximalSVM.py b/src/python/15.BigData_MapReduce/proximalSVM.py index 8fb01ee6..eed2e423 100644 --- a/src/python/15.BigData_MapReduce/proximalSVM.py +++ b/src/python/15.BigData_MapReduce/proximalSVM.py @@ -1,7 +1,10 @@ +#!/usr/bin/python +# coding:utf8 ''' -Created on Feb 25, 2011 - -@author: Peter +Created on 2011-02-25 +Update on 2017-06-20 +@author: Peter/ApacheCN-xy/片刻 +《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning ''' import numpy @@ -9,28 +12,28 @@ def map(key, value): # input key= class for one training example, e.g. "-1.0" classes = [float(item) for item in key.split(",")] # e.g. [-1.0] D = numpy.diag(classes) - + # input value = feature vector for one training example, e.g. "3.0, 7.0, 2.0" featurematrix = [float(item) for item in value.split(",")] A = numpy.matrix(featurematrix) - + # create matrix E and vector e - e = numpy.matrix(numpy.ones(len(A)).reshape(len(A),1)) - E = numpy.matrix(numpy.append(A,-e,axis=1)) - + e = numpy.matrix(numpy.ones(len(A)).reshape(len(A), 1)) + E = numpy.matrix(numpy.append(A, -e, axis=1)) + # create a tuple with the values to be used by reducer # and encode it with base64 to avoid potential trouble with '\t' and '\n' used # as default separators in Hadoop Streaming - producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e) ) - + producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e)) + # note: a single constant key "producedkey" sends to only one reducer # somewhat "atypical" due to low degree of parallism on reducer side print "producedkey\t%s" % (producedvalue) - + def reduce(key, values, mu=0.1): sumETE = None sumETDe = None - + # key isn't used, so ignoring it with _ (underscore). for _, value in values: # unpickle values @@ -39,13 +42,13 @@ def reduce(key, values, mu=0.1): # create the I/mu with correct dimensions sumETE = numpy.matrix(numpy.eye(ETE.shape[1])/mu) sumETE += ETE - + if sumETDe == None: # create sumETDe with correct dimensions sumETDe = ETDe else: sumETDe += ETDe - + # note: omega = result[:-1] and gamma = result[-1] # but printing entire vector as output result = sumETE.I*sumETDe