다시 이음

Factorization Machines(FM) 본문

채우기/추천 시스템

Factorization Machines(FM)

Taeho(Damon) 2022. 10. 11. 12:20

Factorization Machines(FM)은 사용자와 아이템의 다양한 특성들을 모델화 함으로써 이러한 예측의 성능을 높이는 방법입니다.

 

앞서 MF에서는 사용자의 취향과 아이템의 특성을 나타내는 특성값을 K개로 요약하여 추출하고 이를 통해 각 사용자의 선호아이템을 예측하는 방식이었습니다.

 

MF에서 사용한 특성값 이외에도 예측에 도움이 될 수 있는 변수가 존재할 수 있는데 이러한 다양한 변수를 종합해서 요인화(Factorization)해주는 방법이 FM입니다.

 

FM의 표준식

 

FM의 기본 아이디어는 모든 변수와 그 변수들간의 상호작용을 고려해서 평점을 예측하는 것입니다.

 

입력변수 x의 모든 가능한 2개씩의 조합에 대해서 해당 잠재요인행렬 v의 값을 내적하고 여기에 x의 값을 곱하는 것이 표준식 입니다.

 

만일 x가 각 사용자와 아이템 각각을 나타내는 one-hot encoding이고 사용자와 아이템 외에 변수가 없으면 결국 MF의 식과 같습니다.

 

FM을 위한 데이터변환

 

MF 방식에 비해 여러가지 다른 변수를 함께 사용함으로 one-hot encoding이 되었을때 대부분의 데이터가 0인 희박행렬(sparse matrix)가 됩니다.

 

이러한 행렬을 모두 처리하면 비효율적이기 때문에 0이 아닌 값을 가지는 입력변수만을 골라서 계산합니다.

 

 

FM 구현

class FM():
	def __init__(self, N,K,data,y,alpha,beta,train_ratio=0.75, iterations=100,tolerance=0.005,l2_reg=True, verbose=True):
    	self.K=K #latent feature의 수
        self.N=N # 입력변수x의 개수
        self.n_cases=len(data)
        self.alpha=alpha #학습률
        self.beta =beta #정규화 계수
        self.iterations=iterations
        self.l2_reg=l2_reg #정규화 여부
        self.tolerance=tolerance #반복 중단하는 RMSE기준
        self.verbose = verbose #학습 상황 표시 여부
        
        #w초기화
        self.w=np.random.normal(scale=1./self.N, size=(self.N))
        #v초기화
        self.v=np.random.normal(scale=1./self.K, size=(self.N,self.K))
        #Train/test 분리
        cutoff = int(train_ratio*len(data))
        self.train_X = data[:cutoff]
        self.test_X = data[cutoff:]
        self.train_y = y[:cutoff]
        self.test_y = y[cutoff:]
        
    def test(self): #학습 실행하는 함수
    	best_RMSE=10000
        best_iteration=0
        training_process=[]
       	for i in range(self.iterations):
        	rmse1= self.sgd(self.train_X,self.train_y)
            rmse2 = self.test_rmse(self.test_X,self.test_y)
            training_process.append((i,rmse1,rmse2))
            if self.verbose:
            	if (i+1)%10==0:
                	print('Iteration:%d; Train RMSE = %.6f; Test RMSE=%.6f'%(i+1,rmse1,rmse2))
                if best_RMSE > rmse2:
                	best_RMSE = rmse2
                    best_iteration = i
                elif (rmse2 - best_RMSE) > self.tolerance:
                	break # RMSE가 정해진 tolerance보다 악화되면 중지
        print(best_iteration,best_RMSE)
        return training_process
        
    #w,v 업데이트를 위한 SGD
    def sgd(self, x_data, y_data):
    	y_pred=[]
        for data, y in zip(x_data,y_data):
        	x_idx = data[0]
            x_0 = np.array(data[1])
            x_1 = x_0.reshape(-1,1)
            
            #편향계산
            bias_score=np.sum(self.w[x_idx]*x_0)
            
            #score 계산
            vx = self.v[x_idx]*(x_1)
            sum_vx = np.sum(vx,axis=0)
            sum_vx2 = np.sum(vx*vx, axis=0)
            latent_score = 0.5 * np.sum(np.square(sum_vx) - sum_vx2)
            
            #예측값 계산
            y_hat = bias_score + latent_score
            y_pred.append(y_hat)
            error = y- y_hat
            
            #w,v 업데이트
            if self.l2_reg: #정규화 있을때
            	self.w[x_idx] += error* self.alpha * (x_0 - self.beta*self.w[x_idx])
                self.v[x_idx] += error * self.alpha * ((x_1)*sum(vx) - (vx*x_1) - self.beta*self.v[x_idx])
            else:# 정규화 없을때
            	self.w[x_idx] += error* self.alpha * x_0
            	self.v[x_idx] += error * self.alpha * ((x_1)*sum(vx) - (vx*x_1))
	
    return RMSE(y_data, y_pred)
    
    def test_rmse(self, x_data, y_data):
    	y_pred=[]
        for data, y in zip(x_data,y_data):
        	y_hat = self.predict(data[0],data[1])
            y_pred.append(y_hat)
        return RMSE(y_data, y_pred)
        
    def predict(self, idx, x):
    	x_0 = np.array(x)
        x_1 = x_0.reshape(-1,1)
        
        bias_score = np.sum(self.w[idx]*x_0)
        
        vx = self.v[idx]*(x_1)
        sum_vx = np.sum(vx, axis=0)
        sum_vx_2 = np.sum(vx*vx, axis=0)
        latent_score = 0.5*np.sum(np.square(sum_vx) - sum_vx_2)
        
        y_hat = bias_score + latent_score
        
        return y_hat

#잠재요인을 350개로 하고 다른 파라미터는 MF와 유사하게 한 후에 FM 실행
K=350
fm1 = FM(num_x, K, data, y, alpha=0.0014, beta=0.075, train_ratio=0.75, iterations=600, tolerance=0.0005, l2_reg=True,verbose=True)

result = fm1.test()