채우기/추천 시스템
Factorization Machines(FM)
Taeho(Damon)
2022. 10. 11. 12:20
Factorization Machines(FM)은 사용자와 아이템의 다양한 특성들을 모델화 함으로써 이러한 예측의 성능을 높이는 방법입니다.
앞서 MF에서는 사용자의 취향과 아이템의 특성을 나타내는 특성값을 K개로 요약하여 추출하고 이를 통해 각 사용자의 선호아이템을 예측하는 방식이었습니다.
MF에서 사용한 특성값 이외에도 예측에 도움이 될 수 있는 변수가 존재할 수 있는데 이러한 다양한 변수를 종합해서 요인화(Factorization)해주는 방법이 FM입니다.
FM의 표준식
FM의 기본 아이디어는 모든 변수와 그 변수들간의 상호작용을 고려해서 평점을 예측하는 것입니다.
입력변수 x의 모든 가능한 2개씩의 조합에 대해서 해당 잠재요인행렬 v의 값을 내적하고 여기에 x의 값을 곱하는 것이 표준식 입니다.
만일 x가 각 사용자와 아이템 각각을 나타내는 one-hot encoding이고 사용자와 아이템 외에 변수가 없으면 결국 MF의 식과 같습니다.
FM을 위한 데이터변환
MF 방식에 비해 여러가지 다른 변수를 함께 사용함으로 one-hot encoding이 되었을때 대부분의 데이터가 0인 희박행렬(sparse matrix)가 됩니다.
이러한 행렬을 모두 처리하면 비효율적이기 때문에 0이 아닌 값을 가지는 입력변수만을 골라서 계산합니다.
FM 구현
class FM():
def __init__(self, N,K,data,y,alpha,beta,train_ratio=0.75, iterations=100,tolerance=0.005,l2_reg=True, verbose=True):
self.K=K #latent feature의 수
self.N=N # 입력변수x의 개수
self.n_cases=len(data)
self.alpha=alpha #학습률
self.beta =beta #정규화 계수
self.iterations=iterations
self.l2_reg=l2_reg #정규화 여부
self.tolerance=tolerance #반복 중단하는 RMSE기준
self.verbose = verbose #학습 상황 표시 여부
#w초기화
self.w=np.random.normal(scale=1./self.N, size=(self.N))
#v초기화
self.v=np.random.normal(scale=1./self.K, size=(self.N,self.K))
#Train/test 분리
cutoff = int(train_ratio*len(data))
self.train_X = data[:cutoff]
self.test_X = data[cutoff:]
self.train_y = y[:cutoff]
self.test_y = y[cutoff:]
def test(self): #학습 실행하는 함수
best_RMSE=10000
best_iteration=0
training_process=[]
for i in range(self.iterations):
rmse1= self.sgd(self.train_X,self.train_y)
rmse2 = self.test_rmse(self.test_X,self.test_y)
training_process.append((i,rmse1,rmse2))
if self.verbose:
if (i+1)%10==0:
print('Iteration:%d; Train RMSE = %.6f; Test RMSE=%.6f'%(i+1,rmse1,rmse2))
if best_RMSE > rmse2:
best_RMSE = rmse2
best_iteration = i
elif (rmse2 - best_RMSE) > self.tolerance:
break # RMSE가 정해진 tolerance보다 악화되면 중지
print(best_iteration,best_RMSE)
return training_process
#w,v 업데이트를 위한 SGD
def sgd(self, x_data, y_data):
y_pred=[]
for data, y in zip(x_data,y_data):
x_idx = data[0]
x_0 = np.array(data[1])
x_1 = x_0.reshape(-1,1)
#편향계산
bias_score=np.sum(self.w[x_idx]*x_0)
#score 계산
vx = self.v[x_idx]*(x_1)
sum_vx = np.sum(vx,axis=0)
sum_vx2 = np.sum(vx*vx, axis=0)
latent_score = 0.5 * np.sum(np.square(sum_vx) - sum_vx2)
#예측값 계산
y_hat = bias_score + latent_score
y_pred.append(y_hat)
error = y- y_hat
#w,v 업데이트
if self.l2_reg: #정규화 있을때
self.w[x_idx] += error* self.alpha * (x_0 - self.beta*self.w[x_idx])
self.v[x_idx] += error * self.alpha * ((x_1)*sum(vx) - (vx*x_1) - self.beta*self.v[x_idx])
else:# 정규화 없을때
self.w[x_idx] += error* self.alpha * x_0
self.v[x_idx] += error * self.alpha * ((x_1)*sum(vx) - (vx*x_1))
return RMSE(y_data, y_pred)
def test_rmse(self, x_data, y_data):
y_pred=[]
for data, y in zip(x_data,y_data):
y_hat = self.predict(data[0],data[1])
y_pred.append(y_hat)
return RMSE(y_data, y_pred)
def predict(self, idx, x):
x_0 = np.array(x)
x_1 = x_0.reshape(-1,1)
bias_score = np.sum(self.w[idx]*x_0)
vx = self.v[idx]*(x_1)
sum_vx = np.sum(vx, axis=0)
sum_vx_2 = np.sum(vx*vx, axis=0)
latent_score = 0.5*np.sum(np.square(sum_vx) - sum_vx_2)
y_hat = bias_score + latent_score
return y_hat
#잠재요인을 350개로 하고 다른 파라미터는 MF와 유사하게 한 후에 FM 실행
K=350
fm1 = FM(num_x, K, data, y, alpha=0.0014, beta=0.075, train_ratio=0.75, iterations=600, tolerance=0.0005, l2_reg=True,verbose=True)
result = fm1.test()