mirror of
https://github.com/apachecn/ailearning.git
synced 2026-05-10 00:02:09 +08:00
Merge remote-tracking branch 'upstream/master'
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
#!/usr/bin/python
|
||||
# coding:utf8
|
||||
'''
|
||||
Created on 2017-04-07
|
||||
MapReduce version of Pegasos SVM
|
||||
@@ -9,19 +11,20 @@ from mrjob.job import MRJob
|
||||
import pickle
|
||||
from numpy import *
|
||||
|
||||
|
||||
class MRsvm(MRJob):
|
||||
DEFAULT_INPUT_PROTOCOL = 'json_value'
|
||||
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MRsvm, self).__init__(*args, **kwargs)
|
||||
self.data = pickle.load(open('C:\Users\Peter\machinelearninginaction\Ch15\svmDat27'))
|
||||
self.data = pickle.load(open('input/15.BigData_MapReduce/svmDat27'))
|
||||
self.w = 0
|
||||
self.eta = 0.69
|
||||
self.dataList = []
|
||||
self.k = self.options.batchsize
|
||||
self.numMappers = 1
|
||||
self.t = 1 #iteration number
|
||||
|
||||
self.t = 1 # iteration number
|
||||
|
||||
def configure_options(self):
|
||||
super(MRsvm, self).configure_options()
|
||||
self.add_passthrough_option(
|
||||
@@ -30,49 +33,61 @@ class MRsvm(MRJob):
|
||||
self.add_passthrough_option(
|
||||
'--batchsize', dest='batchsize', default=100, type='int',
|
||||
help='k: number of data points in a batch')
|
||||
|
||||
def map(self, mapperId, inVals): # 需要 2 个参数
|
||||
#input: nodeId, ('w', w-vector) OR nodeId, ('x', int)
|
||||
if False: yield
|
||||
if inVals[0]=='w': # 积累 w向量
|
||||
|
||||
def map(self, mapperId, inVals): # 需要 2 个参数
|
||||
# input: nodeId, ('w', w-vector) OR nodeId, ('x', int)
|
||||
if False:
|
||||
yield
|
||||
if inVals[0] == 'w': # 积累 w向量
|
||||
self.w = inVals[1]
|
||||
elif inVals[0]=='x':
|
||||
self.dataList.append(inVals[1])# 累积数据点计算
|
||||
elif inVals[0]=='t': self.t = inVals[1]
|
||||
else: self.eta=inVals # 这用于 debug, eta未在map中使用
|
||||
|
||||
elif inVals[0] == 'x':
|
||||
self.dataList.append(inVals[1]) # 累积数据点计算
|
||||
elif inVals[0] == 't':
|
||||
self.t = inVals[1]
|
||||
else:
|
||||
self.eta = inVals # 这用于 debug, eta未在map中使用
|
||||
|
||||
def map_fin(self):
|
||||
labels = self.data[:,-1]; X=self.data[:,0:-1]# 将数据重新形成 X 和 Y
|
||||
if self.w == 0: self.w = [0.001]*shape(X)[1] # 在第一次迭代时,初始化 w
|
||||
labels = self.data[:,-1]
|
||||
X = self.data[:, 0:-1] # 将数据重新形成 X 和 Y
|
||||
if self.w == 0:
|
||||
self.w = [0.001] * shape(X)[1] # 在第一次迭代时,初始化 w
|
||||
for index in self.dataList:
|
||||
p = mat(self.w)*X[index,:].T #calc p=w*dataSet[key].T
|
||||
p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T
|
||||
if labels[index]*p < 1.0:
|
||||
yield (1, ['u', index])# 确保一切数据包含相同的key
|
||||
yield (1, ['w', self.w]) # 它们将在同一个 reducer
|
||||
yield (1, ['u', index]) # 确保一切数据包含相同的key
|
||||
yield (1, ['w', self.w]) # 它们将在同一个 reducer
|
||||
yield (1, ['t', self.t])
|
||||
|
||||
def reduce(self, _, packedVals):
|
||||
for valArr in packedVals: # 从流输入获取值
|
||||
if valArr[0]=='u': self.dataList.append(valArr[1])
|
||||
elif valArr[0]=='w': self.w = valArr[1]
|
||||
elif valArr[0]=='t': self.t = valArr[1]
|
||||
labels = self.data[:,-1]; X=self.data[:,0:-1]
|
||||
wMat = mat(self.w); wDelta = mat(zeros(len(self.w)))
|
||||
for valArr in packedVals: # 从流输入获取值
|
||||
if valArr[0] == 'u':
|
||||
self.dataList.append(valArr[1])
|
||||
elif valArr[0] == 'w':
|
||||
self.w = valArr[1]
|
||||
elif valArr[0] == 't':
|
||||
self.t = valArr[1]
|
||||
|
||||
labels = self.data[:, -1]
|
||||
X = self.data[:, 0:-1]
|
||||
wMat = mat(self.w)
|
||||
wDelta = mat(zeros(len(self.w)))
|
||||
|
||||
for index in self.dataList:
|
||||
wDelta += float(labels[index])*X[index,:] #wDelta += label*dataSet
|
||||
eta = 1.0/(2.0*self.t) #calc new: eta
|
||||
#calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta
|
||||
wDelta += float(labels[index]) * X[index, :] # wDelta += label*dataSet
|
||||
eta = 1.0/(2.0*self.t) # calc new: eta
|
||||
# calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta
|
||||
wMat = (1.0 - 1.0/self.t)*wMat + (eta/self.k)*wDelta
|
||||
for mapperNum in range(1,self.numMappers+1):
|
||||
yield (mapperNum, ['w', wMat.tolist()[0] ]) #发出 w
|
||||
for mapperNum in range(1, self.numMappers+1):
|
||||
yield (mapperNum, ['w', wMat.tolist()[0]]) # 发出 w
|
||||
if self.t < self.options.iterations:
|
||||
yield (mapperNum, ['t', self.t+1])# 增量 T
|
||||
for j in range(self.k/self.numMappers):#emit random ints for mappers iid
|
||||
yield (mapperNum, ['x', random.randint(shape(self.data)[0]) ])
|
||||
|
||||
yield (mapperNum, ['t', self.t+1]) # 增量 T
|
||||
for j in range(self.k/self.numMappers): # emit random ints for mappers iid
|
||||
yield (mapperNum, ['x', random.randint(shape(self.data)[0])])
|
||||
|
||||
def steps(self):
|
||||
return ([self.mr(mapper=self.map, reducer=self.reduce,
|
||||
mapper_final=self.map_fin)]*self.options.iterations)
|
||||
return ([self.mr(mapper=self.map, reducer=self.reduce, mapper_final=self.map_fin)] * self.options.iterations)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
MRsvm.run()
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
#!/usr/bin/python
|
||||
# coding:utf8
|
||||
'''
|
||||
Created on 2017-04-07
|
||||
Sequential Pegasos
|
||||
the input T is k*T in Batch Pegasos
|
||||
@author: Peter/ApacheCN-xy
|
||||
'''
|
||||
|
||||
from numpy import *
|
||||
|
||||
|
||||
def loadDataSet(fileName):
|
||||
dataMat = []; labelMat = []
|
||||
fr = open(fileName)
|
||||
@@ -17,6 +19,7 @@ def loadDataSet(fileName):
|
||||
labelMat.append(float(lineArr[2]))
|
||||
return dataMat,labelMat
|
||||
|
||||
|
||||
def seqPegasos(dataSet, labels, lam, T):
|
||||
m,n = shape(dataSet); w = zeros(n)
|
||||
for t in range(1, T+1):
|
||||
@@ -29,10 +32,12 @@ def seqPegasos(dataSet, labels, lam, T):
|
||||
w = (1.0 - 1/t)*w
|
||||
print w
|
||||
return w
|
||||
|
||||
|
||||
|
||||
def predict(w, x):
|
||||
return w*x.T
|
||||
|
||||
|
||||
def batchPegasos(dataSet, labels, lam, T, k):
|
||||
m,n = shape(dataSet); w = zeros(n);
|
||||
dataIndex = range(m)
|
||||
@@ -48,6 +53,8 @@ def batchPegasos(dataSet, labels, lam, T, k):
|
||||
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
|
||||
return w
|
||||
|
||||
|
||||
|
||||
datArr,labelList = loadDataSet('testSet.txt')
|
||||
datMat = mat(datArr)
|
||||
#finalWs = seqPegasos(datMat, labelList, 2, 5000)
|
||||
@@ -74,4 +81,4 @@ ax.plot(x,y)
|
||||
ax.plot(x,y2,'g-.')
|
||||
ax.axis([-6,8,-4,5])
|
||||
ax.legend(('50 Iterations', '2 Iterations') )
|
||||
plt.show()
|
||||
plt.show()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#!/usr/bin/python
|
||||
# coding:utf8
|
||||
from mrjob.job import MRJob
|
||||
import json
|
||||
|
||||
|
||||
class MRWordCountUtility(MRJob):
|
||||
|
||||
@@ -199,9 +199,9 @@ def simpleTest():
|
||||
# 因为数组没有是复制n份, array的乘法就是乘法
|
||||
dataArr = array(dataMat)
|
||||
# print dataArr
|
||||
weights = gradAscent(dataArr, labelMat)
|
||||
# weights = gradAscent(dataArr, labelMat)
|
||||
# weights = stocGradAscent0(dataArr, labelMat)
|
||||
# weights = stocGradAscent1(dataArr, labelMat)
|
||||
weights = stocGradAscent1(dataArr, labelMat)
|
||||
# print '*'*30, weights
|
||||
|
||||
# 数据可视化
|
||||
@@ -278,5 +278,5 @@ def multiTest():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
simpleTest()
|
||||
# multiTest()
|
||||
# simpleTest()
|
||||
multiTest()
|
||||
|
||||
Reference in New Issue
Block a user