#!/usr/bin/python # coding:utf8 ''' Created on Jan 8, 2011 Update on 2017-05-18 @author: Peter Harrington/小瑶 《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning ''' from numpy import * import matplotlib.pylab as plt def loadDataSet(fileName): """ 加载数据 解析以tab键分隔的文件中的浮点数 Returns: dataMat : feature 对应的数据集 labelMat : feature 对应的分类标签,即类别标签 """ # 获取样本特征的总数,不算最后的目标变量 numFeat = len(open(fileName).readline().split('\t')) - 1 dataMat = [] labelMat = [] fr = open(fileName) for line in fr.readlines(): # 读取每一行 lineArr =[] # 删除一行中以tab分隔的数据前后的空白符号 curLine = line.strip().split('\t') # i 从0到2,不包括2 for i in range(numFeat): # 将数据添加到lineArr List中,每一行数据测试数据组成一个行向量 lineArr.append(float(curLine[i])) # 将测试数据的输入数据部分存储到dataMat 的List中 dataMat.append(lineArr) # 将每一行的最后一个数据,即类别,或者叫目标变量存储到labelMat List中 labelMat.append(float(curLine[-1])) return dataMat,labelMat def standRegres(xArr,yArr): ''' Description: 线性回归 Args: xArr :输入的样本数据,包含每个样本数据的 feature yArr :对应于输入数据的类别标签,也就是每个样本对应的目标变量 Returns: ws:回归系数 ''' # mat()函数将xArr,yArr转换为矩阵 mat().T 代表的是对矩阵进行转置操作 xMat = mat(xArr) yMat = mat(yArr).T # 矩阵乘法的条件是左矩阵的列数等于右矩阵的行数 xTx = xMat.T*xMat # 因为要用到xTx的逆矩阵,所以事先需要确定计算得到的xTx是否可逆,条件是矩阵的行列式不为0 # linalg.det() 函数是用来求得矩阵的行列式的,如果矩阵的行列式为0,则这个矩阵是不可逆的,就无法进行接下来的运算 if linalg.det(xTx) == 0.0: print "This matrix is singular, cannot do inverse" return # 最小二乘法 # http://www.apache.wiki/pages/viewpage.action?pageId=5505133 # 书中的公式,求得w的最优解 ws = xTx.I * (xMat.T*yMat) return ws # 局部加权线性回归 def lwlr(testPoint,xArr,yArr,k=1.0): ''' Description: 局部加权线性回归,在待预测点附近的每个点赋予一定的权重,在子集上基于最小均方差来进行普通的回归。 Args: testPoint:样本点 xArr:样本的特征数据,即 feature yArr:每个样本对应的类别标签,即目标变量 k:关于赋予权重矩阵的核的一个参数,与权重的衰减速率有关 Returns: testPoint * ws:数据点与具有权重的系数相乘得到的预测点 Notes: 这其中会用到计算权重的公式,w = e^((x^((i))-x) / -2k^2) 理解:x为某个预测点,x^((i))为样本点,样本点距离预测点越近,贡献的误差越大(权值越大),越远则贡献的误差越小(权值越小)。 关于预测点的选取,在我的代码中取的是样本点。其中k是带宽参数,控制w(钟形函数)的宽窄程度,类似于高斯函数的标准差。 算法思路:假设预测点取样本点中的第i个样本点(共m个样本点),遍历1到m个样本点(含第i个),算出每一个样本点与预测点的距离, 也就可以计算出每个样本贡献误差的权值,可以看出w是一个有m个元素的向量(写成对角阵形式)。 ''' # mat() 函数是将array转换为矩阵的函数, mat().T 是转换为矩阵之后,再进行转置操作 xMat = mat(xArr) yMat = mat(yArr).T # 获得xMat矩阵的行数 m = shape(xMat)[0] # eye()返回一个对角线元素为1,其他元素为0的二维数组,创建权重矩阵weights,该矩阵为每个样本点初始化了一个权重 weights = mat(eye((m))) for j in range(m): # testPoint 的形式是 一个行向量的形式 # 计算 testPoint 与输入样本点之间的距离,然后下面计算出每个样本贡献误差的权值 diffMat = testPoint - xMat[j,:] # k控制衰减的速度 weights[j,j] = exp(diffMat*diffMat.T/(-2.0*k**2)) # 根据矩阵乘法计算 xTx ,其中的 weights 矩阵是样本点对应的权重矩阵 xTx = xMat.T * (weights * xMat) if linalg.det(xTx) == 0.0: print ("This matrix is singular, cannot do inverse") return # 计算出回归系数的一个估计 ws = xTx.I * (xMat.T * (weights * yMat)) return testPoint * ws def lwlrTest(testArr,xArr,yArr,k=1.0): ''' Description: 测试局部加权线性回归,对数据集中每个点调用 lwlr() 函数 Args: testArr:测试所用的所有样本点 xArr:样本的特征数据,即 feature yArr:每个样本对应的类别标签,即目标变量 k:控制核函数的衰减速率 Returns: yHat:预测点的估计值 ''' # 得到样本点的总数 m = shape(testArr)[0] # 构建一个全部都是 0 的 1 * m 的矩阵 yHat = zeros(m) # 循环所有的数据点,并将lwlr运用于所有的数据点 for i in range(m): yHat[i] = lwlr(testArr[i],xArr,yArr,k) # 返回估计值 return yHat def lwlrTestPlot(xArr,yArr,k=1.0): ''' Description: 首先将 X 排序,其余的都与lwlrTest相同,这样更容易绘图 Args: xArr:样本的特征数据,即 feature yArr:每个样本对应的类别标签,即目标变量,实际值 k:控制核函数的衰减速率的有关参数,这里设定的是常量值 1 Return: yHat:样本点的估计值 xCopy:xArr的复制 ''' # 生成一个与目标变量数目相同的 0 向量 yHat = zeros(shape(yArr)) # 将 xArr 转换为 矩阵形式 xCopy = mat(xArr) # 排序 xCopy.sort(0) # 开始循环,为每个样本点进行局部加权线性回归,得到最终的目标变量估计值 for i in range(shape(xArr)[0]): yHat[i] = lwlr(xCopy[i],xArr,yArr,k) return yHat,xCopy def rssError(yArr,yHatArr): ''' Desc: 计算分析预测误差的大小 Args: yArr:真实的目标变量 yHatArr:预测得到的估计值 Returns: 计算真实值和估计值得到的值的平方和作为最后的返回值 ''' return ((yArr-yHatArr)**2).sum() def ridgeRegres(xMat,yMat,lam=0.2): ''' Desc: 这个函数实现了给定 lambda 下的岭回归求解。 如果数据的特征比样本点还多,就不能再使用上面介绍的的线性回归和局部现行回归了,因为计算 (xTx)^(-1)会出现错误。 如果特征比样本点还多(n > m),也就是说,输入数据的矩阵x不是满秩矩阵。非满秩矩阵在求逆时会出现问题。 为了解决这个问题,我们下边讲一下:岭回归,这是我们要讲的第一种缩减方法。 Args: xMat:样本的特征数据,即 feature yMat:每个样本对应的类别标签,即目标变量,实际值 lam:引入的一个λ值,使得矩阵非奇异 Returns: 经过岭回归公式计算得到的回归系数 ''' xTx = xMat.T*xMat # 岭回归就是在矩阵 xTx 上加一个 λI 从而使得矩阵非奇异,进而能对 xTx + λI 求逆 denom = xTx + eye(shape(xMat)[1])*lam # 检查行列式是否为零,即矩阵是否可逆,行列式为0的话就不可逆,不为0的话就是可逆。 if linalg.det(denom) == 0.0: print ("This matrix is singular, cannot do inverse") return ws = denom.I * (xMat.T*yMat) return ws def ridgeTest(xArr,yArr): ''' Desc: 函数 ridgeTest() 用于在一组 λ 上测试结果 Args: xArr:样本数据的特征,即 feature yArr:样本数据的类别标签,即真实数据 Returns: wMat:将所有的回归系数输出到一个矩阵并返回 ''' xMat = mat(xArr) yMat=mat(yArr).T # 计算Y的均值 yMean = mean(yMat,0) # Y的所有的特征减去均值 yMat = yMat - yMean # 标准化 x,计算 xMat 平均值 xMeans = mean(xMat,0) # 然后计算 X的方差 xVar = var(xMat,0) # 所有特征都减去各自的均值并除以方差 xMat = (xMat - xMeans)/xVar # 可以在 30 个不同的 lambda 下调用 ridgeRegres() 函数。 numTestPts = 30 # 创建30 * m 的全部数据为0 的矩阵 wMat = zeros((numTestPts,shape(xMat)[1])) for i in range(numTestPts): # exp() 返回 e^x ws = ridgeRegres(xMat,yMat,exp(i-10)) wMat[i,:]=ws.T return wMat def regularize(xMat):# 按列进行规范化 inMat = xMat.copy() inMeans = mean(inMat,0) # 计算平均值然后减去它 inVar = var(inMat,0) # 计算除以Xi的方差 inMat = (inMat - inMeans)/inVar return inMat def stageWise(xArr,yArr,eps=0.01,numIt=100): xMat = mat(xArr); yMat=mat(yArr).T yMean = mean(yMat,0) yMat = yMat - yMean # 也可以规则化ys但会得到更小的coef xMat = regularize(xMat) m,n=shape(xMat) #returnMat = zeros((numIt,n)) # 测试代码删除 ws = zeros((n,1)); wsTest = ws.copy(); wsMax = ws.copy() for i in range(numIt): print (ws.T) lowestError = inf; for j in range(n): for sign in [-1,1]: wsTest = ws.copy() wsTest[j] += eps*sign yTest = xMat*wsTest rssE = rssError(yMat.A,yTest.A) if rssE < lowestError: lowestError = rssE wsMax = wsTest ws = wsMax.copy() #returnMat[i,:]=ws.T #return returnMat #def scrapePage(inFile,outFile,yr,numPce,origPrc): # from BeautifulSoup import BeautifulSoup # fr = open(inFile); fw=open(outFile,'a') #a is append mode writing # soup = BeautifulSoup(fr.read()) # i=1 # currentRow = soup.findAll('table', r="%d" % i) # while(len(currentRow)!=0): # title = currentRow[0].findAll('a')[1].text # lwrTitle = title.lower() # if (lwrTitle.find('new') > -1) or (lwrTitle.find('nisb') > -1): # newFlag = 1.0 # else: # newFlag = 0.0 # soldUnicde = currentRow[0].findAll('td')[3].findAll('span') # if len(soldUnicde)==0: # print "item #%d did not sell" % i # else: # soldPrice = currentRow[0].findAll('td')[4] # priceStr = soldPrice.text # priceStr = priceStr.replace('$','') #strips out $ # priceStr = priceStr.replace(',','') #strips out , # if len(soldPrice)>1: # priceStr = priceStr.replace('Free shipping', '') #strips out Free Shipping # print "%s\t%d\t%s" % (priceStr,newFlag,title) # fw.write("%d\t%d\t%d\t%f\t%s\n" % (yr,numPce,newFlag,origPrc,priceStr)) # i += 1 # currentRow = soup.findAll('table', r="%d" % i) # fw.close() ''' from time import sleep import json import urllib2 def searchForSet(retX, retY, setNum, yr, numPce, origPrc): sleep(10) myAPIstr = 'AIzaSyD2cR2KFyx12hXu6PFU-wrWot3NXvko8vY' searchURL = 'https://www.googleapis.com/shopping/search/v1/public/products?key=%s&country=US&q=lego+%d&alt=json' % (myAPIstr, setNum) pg = urllib2.urlopen(searchURL) retDict = json.loads(pg.read()) for i in range(len(retDict['items'])): try: currItem = retDict['items'][i] if currItem['product']['condition'] == 'new': newFlag = 1 else: newFlag = 0 listOfInv = currItem['product']['inventories'] for item in listOfInv: sellingPrice = item['price'] if sellingPrice > origPrc * 0.5: print ("%d\t%d\t%d\t%f\t%f" % (yr,numPce,newFlag,origPrc, sellingPrice)) retX.append([yr, numPce, newFlag, origPrc]) retY.append(sellingPrice) except: print ('problem with item %d' % i) def setDataCollect(retX, retY): searchForSet(retX, retY, 8288, 2006, 800, 49.99) searchForSet(retX, retY, 10030, 2002, 3096, 269.99) searchForSet(retX, retY, 10179, 2007, 5195, 499.99) searchForSet(retX, retY, 10181, 2007, 3428, 199.99) searchForSet(retX, retY, 10189, 2008, 5922, 299.99) searchForSet(retX, retY, 10196, 2009, 3263, 249.99) def crossValidation(xArr,yArr,numVal=10): m = len(yArr) indexList = range(m) errorMat = zeros((numVal,30))#create error mat 30columns numVal rows创建error mat 30columns numVal 行 for i in range(numVal): trainX=[]; trainY=[] testX = []; testY = [] random.shuffle(indexList) for j in range(m):#create training set based on first 90% of values in indexList #基于indexList中的前90%的值创建训练集 if j < m*0.9: trainX.append(xArr[indexList[j]]) trainY.append(yArr[indexList[j]]) else: testX.append(xArr[indexList[j]]) testY.append(yArr[indexList[j]]) wMat = ridgeTest(trainX,trainY) #get 30 weight vectors from ridge for k in range(30):#loop over all of the ridge estimates matTestX = mat(testX); matTrainX=mat(trainX) meanTrain = mean(matTrainX,0) varTrain = var(matTrainX,0) matTestX = (matTestX-meanTrain)/varTrain #regularize test with training params yEst = matTestX * mat(wMat[k,:]).T + mean(trainY)#test ridge results and store errorMat[i,k]=rssError(yEst.T.A,array(testY)) #print errorMat[i,k] meanErrors = mean(errorMat,0)#calc avg performance of the different ridge weight vectors minMean = float(min(meanErrors)) bestWeights = wMat[nonzero(meanErrors==minMean)] #can unregularize to get model #when we regularized we wrote Xreg = (x-meanX)/var(x) #we can now write in terms of x not Xreg: x*w/var(x) - meanX/var(x) +meanY xMat = mat(xArr); yMat=mat(yArr).T meanX = mean(xMat,0); varX = var(xMat,0) unReg = bestWeights/varX print ("the best model from Ridge Regression is:\n",unReg) print ("with constant term: ",-1*sum(multiply(meanX,unReg)) + mean(yMat)) ''' #test for standRegression def regression1(): xArr, yArr = loadDataSet("input/8.Regression/data.txt") xMat = mat(xArr) yMat = mat(yArr) ws = standRegres(xArr, yArr) fig = plt.figure() ax = fig.add_subplot(111) #add_subplot(349)函数的参数的意思是,将画布分成3行4列图像画在从左到右从上到下第9块 ax.scatter(xMat[:, 1].flatten(), yMat.T[:, 0].flatten().A[0]) #scatter 的x是xMat中的第二列,y是yMat的第一列 xCopy = xMat.copy() xCopy.sort(0) yHat = xCopy * ws ax.plot(xCopy[:, 1], yHat) plt.show() #test for LWLR def regression2(): xArr, yArr = loadDataSet("input/8.Regression/data.txt") yHat = lwlrTest(xArr, xArr, yArr, 0.003) xMat = mat(xArr) srtInd = xMat[:,1].argsort(0) #argsort()函数是将x中的元素从小到大排列,提取其对应的index(索引),然后输出 xSort=xMat[srtInd][:,0,:] fig = plt.figure() ax = fig.add_subplot(111) ax.plot(xSort[:,1], yHat[srtInd]) ax.scatter(xMat[:,1].flatten().A[0], mat(yArr).T.flatten().A[0] , s=2, c='red') plt.show() #test for ridgeRegression def regression3(): abX,abY = loadDataSet("input/8.Regression/abalone.txt") ridgeWeights = ridgeTest(abX, abY) fig = plt.figure() ax = fig.add_subplot(111) ax.plot(ridgeWeights) plt.show() #test for stageWise def regression4(): xArr,yArr=loadDataSet("input/8.Regression/abalone.txt") stageWise(xArr,yArr,0.01,200) xMat = mat(xArr) yMat = mat(yArr).T xMat = regularize(xMat) yM = mean(yMat,0) yMat = yMat - yM weights = standRegres(xMat, yMat.T) print (weights.T) if __name__ == "__main__": # regression1() regression2() # regression3() # regression4()