mirror of
https://github.com/apachecn/ailearning.git
synced 2026-02-13 15:26:28 +08:00
添加15章代码和测试数据
This commit is contained in:
43
src/python/15.BigData_MapReduce/mrMean.py
Normal file
43
src/python/15.BigData_MapReduce/mrMean.py
Normal file
@@ -0,0 +1,43 @@
|
||||
'''
|
||||
Created on 2017-04-07
|
||||
|
||||
@author: Peter/ApacheCN-xy
|
||||
'''
|
||||
from mrjob.job import MRJob
|
||||
|
||||
class MRmean(MRJob):
|
||||
def __init__(self, *args, **kwargs): # 对数据初始化
|
||||
super(MRmean, self).__init__(*args, **kwargs)
|
||||
self.inCount = 0
|
||||
self.inSum = 0
|
||||
self.inSqSum = 0
|
||||
|
||||
def map(self, key, val): # 需要 2 个参数,求数据的和与平方和
|
||||
if False: yield
|
||||
inVal = float(val)
|
||||
self.inCount += 1
|
||||
self.inSum += inVal
|
||||
self.inSqSum += inVal*inVal
|
||||
|
||||
def map_final(self): # 计算数据的平均值,平方的均值,并返回
|
||||
mn = self.inSum/self.inCount
|
||||
mnSq = self.inSqSum/self.inCount
|
||||
yield (1, [self.inCount, mn, mnSq])
|
||||
|
||||
def reduce(self, key, packedValues): #
|
||||
cumVal=0.0; cumSumSq=0.0; cumN=0.0
|
||||
for valArr in packedValues: # 从输入流中获取值
|
||||
nj = float(valArr[0])
|
||||
cumN += nj
|
||||
cumVal += nj*float(valArr[1])
|
||||
cumSumSq += nj*float(valArr[2])
|
||||
mean = cumVal/cumN
|
||||
var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN
|
||||
yield (mean, var) # 发出平均值和方差
|
||||
|
||||
def steps(self):
|
||||
return ([self.mr(mapper=self.map, mapper_final=self.map_final,\
|
||||
reducer=self.reduce,)])
|
||||
|
||||
if __name__ == '__main__':
|
||||
MRmean.run()
|
||||
78
src/python/15.BigData_MapReduce/mrSVM.py
Normal file
78
src/python/15.BigData_MapReduce/mrSVM.py
Normal file
@@ -0,0 +1,78 @@
|
||||
'''
|
||||
Created on 2017-04-07
|
||||
MapReduce version of Pegasos SVM
|
||||
Using mrjob to automate job flow
|
||||
@author: Peter/ApacheCN-xy
|
||||
'''
|
||||
from mrjob.job import MRJob
|
||||
|
||||
import pickle
|
||||
from numpy import *
|
||||
|
||||
class MRsvm(MRJob):
|
||||
DEFAULT_INPUT_PROTOCOL = 'json_value'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MRsvm, self).__init__(*args, **kwargs)
|
||||
self.data = pickle.load(open('C:\Users\Peter\machinelearninginaction\Ch15\svmDat27'))
|
||||
self.w = 0
|
||||
self.eta = 0.69
|
||||
self.dataList = []
|
||||
self.k = self.options.batchsize
|
||||
self.numMappers = 1
|
||||
self.t = 1 #iteration number
|
||||
|
||||
def configure_options(self):
|
||||
super(MRsvm, self).configure_options()
|
||||
self.add_passthrough_option(
|
||||
'--iterations', dest='iterations', default=2, type='int',
|
||||
help='T: number of iterations to run')
|
||||
self.add_passthrough_option(
|
||||
'--batchsize', dest='batchsize', default=100, type='int',
|
||||
help='k: number of data points in a batch')
|
||||
|
||||
def map(self, mapperId, inVals): # 需要 2 个参数
|
||||
#input: nodeId, ('w', w-vector) OR nodeId, ('x', int)
|
||||
if False: yield
|
||||
if inVals[0]=='w': # 积累 w向量
|
||||
self.w = inVals[1]
|
||||
elif inVals[0]=='x':
|
||||
self.dataList.append(inVals[1])# 累积数据点计算
|
||||
elif inVals[0]=='t': self.t = inVals[1]
|
||||
else: self.eta=inVals # 这用于 debug, eta未在map中使用
|
||||
|
||||
def map_fin(self):
|
||||
labels = self.data[:,-1]; X=self.data[:,0:-1]# 将数据重新形成 X 和 Y
|
||||
if self.w == 0: self.w = [0.001]*shape(X)[1] # 在第一次迭代时,初始化 w
|
||||
for index in self.dataList:
|
||||
p = mat(self.w)*X[index,:].T #calc p=w*dataSet[key].T
|
||||
if labels[index]*p < 1.0:
|
||||
yield (1, ['u', index])# 确保一切数据包含相同的key
|
||||
yield (1, ['w', self.w]) # 它们将在同一个 reducer
|
||||
yield (1, ['t', self.t])
|
||||
|
||||
def reduce(self, _, packedVals):
|
||||
for valArr in packedVals: # 从流输入获取值
|
||||
if valArr[0]=='u': self.dataList.append(valArr[1])
|
||||
elif valArr[0]=='w': self.w = valArr[1]
|
||||
elif valArr[0]=='t': self.t = valArr[1]
|
||||
labels = self.data[:,-1]; X=self.data[:,0:-1]
|
||||
wMat = mat(self.w); wDelta = mat(zeros(len(self.w)))
|
||||
for index in self.dataList:
|
||||
wDelta += float(labels[index])*X[index,:] #wDelta += label*dataSet
|
||||
eta = 1.0/(2.0*self.t) #calc new: eta
|
||||
#calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta
|
||||
wMat = (1.0 - 1.0/self.t)*wMat + (eta/self.k)*wDelta
|
||||
for mapperNum in range(1,self.numMappers+1):
|
||||
yield (mapperNum, ['w', wMat.tolist()[0] ]) #发出 w
|
||||
if self.t < self.options.iterations:
|
||||
yield (mapperNum, ['t', self.t+1])# 增量 T
|
||||
for j in range(self.k/self.numMappers):#emit random ints for mappers iid
|
||||
yield (mapperNum, ['x', random.randint(shape(self.data)[0]) ])
|
||||
|
||||
def steps(self):
|
||||
return ([self.mr(mapper=self.map, reducer=self.reduce,
|
||||
mapper_final=self.map_fin)]*self.options.iterations)
|
||||
|
||||
if __name__ == '__main__':
|
||||
MRsvm.run()
|
||||
13
src/python/15.BigData_MapReduce/mrSVMkickStart.py
Normal file
13
src/python/15.BigData_MapReduce/mrSVMkickStart.py
Normal file
@@ -0,0 +1,13 @@
|
||||
'''
|
||||
Created on Feb 27, 2011
|
||||
|
||||
@author: Peter
|
||||
'''
|
||||
from mrjob.protocol import JSONProtocol
|
||||
from numpy import *
|
||||
|
||||
fw=open('kickStart2.txt', 'w')
|
||||
for i in [1]:
|
||||
for j in range(100):
|
||||
fw.write('["x", %d]\n' % random.randint(200))
|
||||
fw.close()
|
||||
77
src/python/15.BigData_MapReduce/pegasos.py
Normal file
77
src/python/15.BigData_MapReduce/pegasos.py
Normal file
@@ -0,0 +1,77 @@
|
||||
'''
|
||||
Created on 2017-04-07
|
||||
Sequential Pegasos
|
||||
the input T is k*T in Batch Pegasos
|
||||
@author: Peter/ApacheCN-xy
|
||||
'''
|
||||
|
||||
from numpy import *
|
||||
|
||||
def loadDataSet(fileName):
|
||||
dataMat = []; labelMat = []
|
||||
fr = open(fileName)
|
||||
for line in fr.readlines():
|
||||
lineArr = line.strip().split('\t')
|
||||
#dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])])
|
||||
dataMat.append([float(lineArr[0]), float(lineArr[1])])
|
||||
labelMat.append(float(lineArr[2]))
|
||||
return dataMat,labelMat
|
||||
|
||||
def seqPegasos(dataSet, labels, lam, T):
|
||||
m,n = shape(dataSet); w = zeros(n)
|
||||
for t in range(1, T+1):
|
||||
i = random.randint(m)
|
||||
eta = 1.0/(lam*t)
|
||||
p = predict(w, dataSet[i,:])
|
||||
if labels[i]*p < 1:
|
||||
w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i,:]
|
||||
else:
|
||||
w = (1.0 - 1/t)*w
|
||||
print w
|
||||
return w
|
||||
|
||||
def predict(w, x):
|
||||
return w*x.T
|
||||
|
||||
def batchPegasos(dataSet, labels, lam, T, k):
|
||||
m,n = shape(dataSet); w = zeros(n);
|
||||
dataIndex = range(m)
|
||||
for t in range(1, T+1):
|
||||
wDelta = mat(zeros(n)) # 重置 wDelta
|
||||
eta = 1.0/(lam*t)
|
||||
random.shuffle(dataIndex)
|
||||
for j in range(k):# 全部的训练集
|
||||
i = dataIndex[j]
|
||||
p = predict(w, dataSet[i,:]) # mapper 代码
|
||||
if labels[i]*p < 1: # mapper 代码
|
||||
wDelta += labels[i]*dataSet[i,:].A # 累积变化
|
||||
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
|
||||
return w
|
||||
|
||||
datArr,labelList = loadDataSet('testSet.txt')
|
||||
datMat = mat(datArr)
|
||||
#finalWs = seqPegasos(datMat, labelList, 2, 5000)
|
||||
finalWs = batchPegasos(datMat, labelList, 2, 50, 100)
|
||||
print finalWs
|
||||
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
fig = plt.figure()
|
||||
ax = fig.add_subplot(111)
|
||||
x1=[]; y1=[]; xm1=[]; ym1=[]
|
||||
for i in range(len(labelList)):
|
||||
if labelList[i] == 1.0:
|
||||
x1.append(datMat[i,0]); y1.append(datMat[i,1])
|
||||
else:
|
||||
xm1.append(datMat[i,0]); ym1.append(datMat[i,1])
|
||||
ax.scatter(x1, y1, marker='s', s=90)
|
||||
ax.scatter(xm1, ym1, marker='o', s=50, c='red')
|
||||
x = arange(-6.0, 8.0, 0.1)
|
||||
y = (-finalWs[0,0]*x - 0)/finalWs[0,1]
|
||||
#y2 = (0.43799*x)/0.12316
|
||||
y2 = (0.498442*x)/0.092387 #2 iterations
|
||||
ax.plot(x,y)
|
||||
ax.plot(x,y2,'g-.')
|
||||
ax.axis([-6,8,-4,5])
|
||||
ax.legend(('50 Iterations', '2 Iterations') )
|
||||
plt.show()
|
||||
52
src/python/15.BigData_MapReduce/proximalSVM.py
Normal file
52
src/python/15.BigData_MapReduce/proximalSVM.py
Normal file
@@ -0,0 +1,52 @@
|
||||
'''
|
||||
Created on Feb 25, 2011
|
||||
|
||||
@author: Peter
|
||||
'''
|
||||
import numpy
|
||||
|
||||
def map(key, value):
|
||||
# input key= class for one training example, e.g. "-1.0"
|
||||
classes = [float(item) for item in key.split(",")] # e.g. [-1.0]
|
||||
D = numpy.diag(classes)
|
||||
|
||||
# input value = feature vector for one training example, e.g. "3.0, 7.0, 2.0"
|
||||
featurematrix = [float(item) for item in value.split(",")]
|
||||
A = numpy.matrix(featurematrix)
|
||||
|
||||
# create matrix E and vector e
|
||||
e = numpy.matrix(numpy.ones(len(A)).reshape(len(A),1))
|
||||
E = numpy.matrix(numpy.append(A,-e,axis=1))
|
||||
|
||||
# create a tuple with the values to be used by reducer
|
||||
# and encode it with base64 to avoid potential trouble with '\t' and '\n' used
|
||||
# as default separators in Hadoop Streaming
|
||||
producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e) )
|
||||
|
||||
# note: a single constant key "producedkey" sends to only one reducer
|
||||
# somewhat "atypical" due to low degree of parallism on reducer side
|
||||
print "producedkey\t%s" % (producedvalue)
|
||||
|
||||
def reduce(key, values, mu=0.1):
|
||||
sumETE = None
|
||||
sumETDe = None
|
||||
|
||||
# key isn't used, so ignoring it with _ (underscore).
|
||||
for _, value in values:
|
||||
# unpickle values
|
||||
ETE, ETDe = pickle.loads(base64.b64decode(value))
|
||||
if sumETE == None:
|
||||
# create the I/mu with correct dimensions
|
||||
sumETE = numpy.matrix(numpy.eye(ETE.shape[1])/mu)
|
||||
sumETE += ETE
|
||||
|
||||
if sumETDe == None:
|
||||
# create sumETDe with correct dimensions
|
||||
sumETDe = ETDe
|
||||
else:
|
||||
sumETDe += ETDe
|
||||
|
||||
# note: omega = result[:-1] and gamma = result[-1]
|
||||
# but printing entire vector as output
|
||||
result = sumETE.I*sumETDe
|
||||
print "%s\t%s" % (key, str(result.tolist()))
|
||||
25
src/python/15.BigData_MapReduce/py27dbg.py
Normal file
25
src/python/15.BigData_MapReduce/py27dbg.py
Normal file
@@ -0,0 +1,25 @@
|
||||
'''
|
||||
Created on Feb 27, 2011
|
||||
MapReduce version of Pegasos SVM
|
||||
Using mrjob to automate job flow
|
||||
@author: Peter
|
||||
'''
|
||||
from mrjob.job import MRJob
|
||||
|
||||
import pickle
|
||||
from numpy import *
|
||||
|
||||
class MRsvm(MRJob):
|
||||
|
||||
def map(self, mapperId, inVals): #needs exactly 2 arguments
|
||||
if False: yield
|
||||
yield (1, 22)
|
||||
|
||||
def reduce(self, _, packedVals):
|
||||
yield "fuck ass"
|
||||
|
||||
def steps(self):
|
||||
return ([self.mr(mapper=self.map, reducer=self.reduce)])
|
||||
|
||||
if __name__ == '__main__':
|
||||
MRsvm.run()
|
||||
31
src/python/15.BigData_MapReduce/wc.py
Normal file
31
src/python/15.BigData_MapReduce/wc.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from mrjob.job import MRJob
|
||||
import json
|
||||
|
||||
|
||||
class MRWordCountUtility(MRJob):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MRWordCountUtility, self).__init__(*args, **kwargs)
|
||||
self.chars = 0
|
||||
self.words = 0
|
||||
self.lines = 0
|
||||
|
||||
def mapper(self, _, line):
|
||||
if False:
|
||||
yield # I'm a generator!
|
||||
|
||||
self.chars += len(line) + 1 # +1 for newline
|
||||
self.words += sum(1 for word in line.split() if word.strip())
|
||||
self.lines += 1
|
||||
|
||||
def mapper_final(self):
|
||||
yield('chars', self.chars)
|
||||
yield('words', self.words)
|
||||
yield('lines', self.lines)
|
||||
|
||||
def reducer(self, key, values):
|
||||
yield(key, sum(values))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
MRWordCountUtility.run()
|
||||
Reference in New Issue
Block a user