关键词:最优化算法·监督学习 原理 : 将分类问题转化为寻找最佳分割超平面,然后归结为凸优化问题,并利用核技巧将非线性问题转化为线性问题。 二次规划的解决方案:KKT条件。 快速实现方案:SMO算法。
定义了一个class,保存相关的参数。也可以直接用一个dict实现。 # 参数列表 # trainFeaturesMat : 训练特征集合 : mat # trainLabelsMat : 训练标签集合 : mat # C : 调和系数 : float # toler : 可接受误差 : float # m : 样本数量 : int # alphas : alpha优化系数向量 : mat[float] # b : b优化截距 : float # eCache : 缓存列表,缓存预测误差 : mat[isCalculted(boolen : 1/0 ),Ei(float)] class optStruct: def __init__(self,trainFeaturesMat,trainLabelsMat,C,toler): self.trainFeaturesMat = trainFeaturesMat self.trainLabelsMat = trainLabelsMat self.C = C self.toler = toler self.m = shape(trainFeaturesMat[0]) self.alphas = mat(zeros((self.m,1))) self.b = 0.0 self.eCache = mat(zeros((self.m,2))) -------------------------------------------------------------------------------
#函数名称: # calcEk(oS,k) #函数功能: # 计算预测误差 #输入参数: # oS : 优化参数结构体变量 # k : 需要计算的样本索引 : int #返回参数: # 返回索引为k的样本的误差 #这个是根据实际改动的,本人认为书上的代码实现存在错误。eCache本来做保存误差缓存之用,当alphas中的任何一项出现变动之后,都必须重新更新 def getEk(oS,k): #先检查eCache中是否有效 if oS.eCache[k,0]: return oS.eCache[k,1] #如果缓存中的数据无效,计算预测值 fXk = float(multiply(oS.alphas,oS.trainLabelsMat).T * (oS.trainFeaturesMat*(oS.trainFeaturesMat[k,:].T))) - oS.b #计算误差 Ek = fXk - float(oS.trainLabelsMat[k]) #更新缓存--无效原因是为了保证当更新alpha对儿时才对对应的eCache进行更新。这个地方其实一直存在疑问。原书也没有给出好的解释。还是哥没理解?FK #反思×2: # 原书是有道理的。只是当更新一对alphas时,才会对其记录进入缓存。 return Ek --------------------------------------------------------------------------------
每次当一对alphas更新之后,都要更新对应的误差缓存。 #函数名称: # updateEk(oS,k) #函数功能: # 当一对alpha更新后,要更新对应的误差缓存 #输入参数: # oS : 优化参数结构体 :optStruct # k : 更新第k个alpha对应的误差缓存 #返回参数: # void 无 def updataEk(oS,k): #计算预测值 fXk = float(multiply(oS.alphas,oS.trainLabesMat).T * (oS.trainFeaturesMat * oS.trainFeaturesMat[k,:])) - oS.b #误差计算 Ek = fXk - oS.trainLabelsMat[k] oS.eCache[k] = [1,Ek] ----------------------------------------------------------------------------------
这里运用了启发式算法。 进入这个函数的前提是,已经挑选出了第一个alpha。选择的方式是选择能产生最大步长的另一个alpha,使得|Ei - Ej|足够大, 这样就能保证有足够明显的下降。这里选取的范围是已经计算过,保存在eCache里面的缓存误差。后面提到的外层函数会涉及。当第一次进入时会对整个样本集都遍历, 这次遍历中,缓存中的有效数据是不断增加的。 当然,在初始阶段,还是会遇到eCache中数据不足的情况,这时候先随机选取。 #函数名称: # selectSecondAlpha(i,oS,Ei) #函数功能: # 选择第二个优化变量 #输入参数: # i : 第一个优化变量索引 : int # oS : 优化参数结构体变量 : optStruct # Ei : 第一个优化变量的预测误差 : float #输出参数: # 返回最佳选择的第二个优化变量的索引 : int def selectSecondAlpha(i,oS,Ei): maxJ = -1 maxDelta = 0.0 Ej = 0.0 Ei = getEk(oS,i) #先从当前有效中进行选择 validEcacheList = nonzero(oS.eCache[:,0].A)[0] #选区步长最大 if (len(validEcccheList)) > 1 : for j in validEcacheList : if j == i : continue Ej = getEk(oS,j) tempDelta = abs(Ei - Ej) if tempDelta > maxDelta: maxDelta = tempDelta maxJ = j #第一次,随机选取 else: j = i while(j == i): j = int(random.uniform(0,oS.m)) maxJ = j Ej = getEk(oS,j) return maxJ,Ej ---------------------------------------------------------------------------------
内层循环主要是寻找可以更新的alphas对。虽然传入第一个alpha索引,但仅仅是提供一个选项。 这个函数内部会先检查第一个给的alpha索引是否可更新,判断的标准是 是否违反kkt 条件。违反了,就说明是可更新的。 然后再挑选第二个alpha索引。之后确定剪辑区间,进行优化。如果优化空间较小,则放弃更新。 如果更新了之后,还要更新截距b,以及更新对应的缓存误差Ei #函数名称: # innerLoop(i,oS) #函数功能: # 内层循环,选定Second Alpha 进行更新 #输入参数: # i : First Alpha的索引号 : int # oS : 优化数据结构体 : optStruct #输出参数: # 1 : 更新了一对alphas # 0 : 没有对任何进行更新 def innerLoop(i,oS): #获取First Alpha 的预测误差 Ei = getEk(i) #判断是否违反KKT条件 if (( oS.alphas[i] > 0)and( oS.trainLabels[i]*Ei > os.toler)) or (( os.alphas[i] < C)and( os.trainLabels[i])*Ei < -toler): #违反,选择第Second Alpha,获取其预测误差 j,Ej = selectSecondAlpha(i,oS,Ei) #alpha pair备份 alphaIOld = oS.alphas[i].copy() alphaJOld = oS.alphas[j].copy() #确定剪辑区间 if oS.trainLabelsMat[i] == oS.trainLabelsMat[j]: L = max( 0.0 , oS.alphas[i] + oS.alphas[j] - oS.C) H = min( oS.C , oS.alphas[i] + oS.alphas[j] ) else: L = max( 0.0 , oS.alphas[j] - oS.alphas[i] ) H = min( C , oS.alphas[j] - oS.alphas[i] + C ) # if L == H : print "L == H" return 0 #eta计算 eta = k11 + k22 - 2*k12 #Kernel(x,y) 内积计算 Kii = oS.trainFeaturesMat[i,:] * oS.trainFeaturesMat[i,:].T Kjj = oS.trainFeaturesMat[j,:] * oS.trainFeaturesMat[j,:].T Kij = oS.trainFeaturesMat[i,:] * oS.trainFeaturesMat[j,:].T eta = Kii + Kjj - 2*Kij if eta <= 0 : print "eat <= 0" return 0 #updata alpha[j] oS.alphas[j] += oS.trainLabelsMat[j] * (Ei - Ej) / eta #对alphas[j]进行剪辑 if oS.alphas[j] < L : oS.alphas[j] = L elif oS.alphas[j] > H : oS.alphas[j] = H #观察变动状况 if abs(oS.alphas[j] - alphaJOld) < 0.0001: #改变幅度很小,还原 oS.alphas[j] = alphaJOld oS.alphas[i] = alphaIOld print "change little" return 0 #update alpha[i] oS.alphas[i] += (alphaJOld - oS.alphas[j]) * oS.trainLabels[i] * oS.trainLabels[j] #update b bi = - Ei - oS.trainLabelsMat[i] * Kii * (oS.alphas[i] - alphasIOld) - oS.trainLabelsMat[j] * Kji * (oS.alphas[j] - alphasJOld) + oS.b bj = - Ej - oS.trainLabelsMat[i] * Kij * (oS.alpahs[i] - alphasIOld) - oS.trainLabelsMat[j] * Kjj * (oS.alphas[j] - alphasJOld) + oS.b if (oS.alphas[i] > 0) and (oS.alphas[i] < oS.C): oS.b = bi elif (oS.alphas[j] > 0) and (oS.alphas[j] < oS.C): oS.b = bj else : oS.b = (bi + bj) / 2 #当更新了alpha_i alpha_j 后,对应的误差缓存也需要更新。 updateEk(oS,i) updateEk(oS.j) #更新一对 return 1 else: return 0 -------------------------------------------------------------------------------------
这个是函数主体,执行外层循环,提供第一个alpha值的索引号。 可以看到,当第一次执行函数或者当前的间隔边界(0 < alpha < C)上没有可以优化的值时,循环会切入对整个样本集合的遍历过程中,即令标志位为真。 否则,将会对间隔边界上的值进行优化。 并且给出最大的迭代次数上限。 #函数名称: # SVM_SMO_Complete(dataFeatures,dataLabels,C,toler,maxIter) #函数功能: # SMO 实现 SVM 的完整版 #输入参数: # dataFeatures : 样本特征集合 # dataLabels : 样本标签集合 # C : 调和参数 : float # toler : 容许误差 : float # maxIter : 最大迭代次数 : int #返回参数: # alphas , b : 最优回归系数 ,截距 : arrray or mat , float def SVM_SMO_Complete(dataFeatures,dataLabels,C=0.6,toler=0.01,maxIter=100): oS = optStruct(mat(dataFeatures),mat(dataLabels),C,toler) #entireSet : 遍历整个样本集合标志位 entireSet = True # 初始阶段要遍历整个集合 #迭代次数 iter = 0 while( (iter < maxIter) and ((entireSet) or (alphasChangedPairs > 0)): #更新alpha对数 alphasChangedPairs = 0 # if entireSet : #遍历整个样本集 for i in range(oS.m): #获取更新的对数 alphasChangedPairs += innerLoop(i,oS) else: #遍历分隔边界上的点 nonBoundIs = nonzeros( (oS.alphas.T > 0) and (oS.alphas.T < oS.C) )[0] for i in nonBoundIs : alphasChangedPairs += innerLoop(i,oS) if entireSet : entireSet = False elif alphasChanged == 0 : entireSet = True iter += 1 return oS.alphas, oS.b ------------------------------------------------------------------------------
根据优化系数alpha,计算权重系数,并进行分类。 #函数名称: # calcW(alphas,dataFeaturesArr,dataLabelsArr) #函数功能: # 计算特征权重 #输入参数: # alpahs : 优化系数 : array # dataFeaturesArr : 样本特征矩阵 : array # dataLabelsArr : 样本分类矩阵 : array #返回参数: # weights : 特征权重 : array def calcW(alphas,dataFeaturesArr,dataLabelsArr): dataFeaturesMat = mat(dataFeaturesArr) dataLabesMat = mat(dataLabelsArr) m , n = shape(dataFeaturesMat) #weights weights = zeros((n,1)) for i in range(m): weights += multiply(alphas[i]*dataLabelsMat[i],dataFeaturesMat[i].T) return weights #函数名称: # classfySVM(weightsArr,b,predictDataFeaturesArr) #函数功能: # 对预测集合进行样本分类 #输入参数: # weightsArr : 特征权重向量 : array # b : 截距 :float # predictDataFeaturesArr : 预测集特征矩阵 : array #返回参数: # predictLabels : 预测分类 : array def classfySVM(weightsArr,b,predictDataFeaturesArr): weightsMat = mat(weightsArr) predFeaturesMat = mat(predictDataFeaturesArr) #计算 predictLabels = predFeaturesMat * weightsMat + b #分类 for i in range(shape(predictLabels[0])): if predictLabes[i] < 0: predictLabels[i] = -1 else : predictlabels[i] = 1 return predictLabels --------------------------------------------------------------------------------------------------------
核函数处理的技巧在于,将低维空间映射到高维空间,而且不增加计算的复杂度。怎么做到的? 映射函数记做 Φ(x) ,它的作用将输入的低维空间(欧式空间)映射到高维空间(希尔伯特空间),比如:输入向量 xT=(x1,x2),则Φ(x)T=(x12,x22,20.5*x1*x2) 这样将二维空间映射到了三维空间。则Kernel Function K(x,y) = Φ(x)*Φ(y) = (x+y)2 因为无论是在优化系数alphas的求解,还是最终的判决上,都会用到计算样本的内积。对于线性不可分的样本,必须将其映射到高维才能运用线性方式进行最优化。 从上面的式子中可以看出,核函数相当求映射之后的向量的内积。只要知道了核函数,直接就能在低维空间计算结果,而不必先将其映射到高维,在计算内积。 因为映射函数是没必要去找的,我们只要找到核函数就行。只要满足相应的条件就能作为核函数。 #函数名称: # kernelTrans(X, A, kTup) #函数功能: # 核函数转换,计算样本集合X与某个样本A之间的径向基距离 #输入参数: # x : 样本集合 : array(m*n) # A : 单个样本 : array(1*n) # kTup : 核函数类型及参数信息 : tuple ( type,arg1,arg2...) #返回参数: # 样本集合与单个样本的高斯径向基距离 m*1 def kernelTrans(X, A, kTup): m,n = shape(X) K = mat(zeros((m,1))) if kTup[0] == 'line': K = X*A.T elif kTup[0] == 'rbf': for i in range(m): delta = X[i] - A K[i]= delta * delta.T K = exp(-K/(2*kTup[1]**2)) else: raise NameError('That Kernel is not recogized') return K #同时对optStruct类进行修改 class optStruct: def __init__(self,dataFeaturesArr,dataLabelsArr,C,toler,Ktup): self.dataFeaturesMat = mat(dataFeaturesArr) self.dataLabelsMat = mat(dataLabelsArr) self.C = C self.toler = toler m,n = shape(dataFeaturesArr) self.m = m self.eCache = mat(zeros((self.m,2)) self.aphas = mat(zeros((self.m,1))) self.b = 0.0 #Ki,j 的意义: 第i个样本与第j个样本内积(映射后内积),因此是m*m维的 self.K = mat(zeros((m,m))) #每次更新一列 for i in range(self.m): self.K[:,i] = kernelTrans(self.dataFeaturesMat,self.dataFeaturesMat[i,:],kTup) # 则利用核函数之后,因为所有的核内积都已经在初始化时确定, 以上其他函数用到内积的计算都替换为oS.K[i,j]即可。 ------------------------------------------------------------------------------------------
在整个决定分割平面之中,最终起作用的是支持向量,即alpha > 0 对应的向量。所以,在对预测集进行判决计算的时候,最好的方式便是只考虑这些向量。 将会极大提高计算速度。 ······ #挑选支持向量 svmIndex = nonzeors(oS.alphas.A > 0)[0] svms = dataMat[svmIndex] labels = dataLabel[svmIndex] m,n = shape(dataPredictMat) ······ #对预测集合进行预测 for i in range(m): kernelEval = kernelTrans(svms,dataPredictMat[i,:],('rbf',0.13)) predictValue = kernelEval.T * multiply(labels,oS.alphas[svmIndex]) + b ······
Welcome to contact me,the friends who like Machine-Learning and Data-Mining.
16 Nov 2014