基于回归模型的协同过滤推荐
如果我们将评分看作是一个连续的值而不是离散的值,那么就可以借助线性回归思想来预测目标用户对某物品的评分。其中一种实现策略被称为 Baseline(基准预测)。
Baseline:基准预测
Baseline 设计思想基于以下的假设:
- 有些用户的评分普遍高于其他用户,有些用户的评分普遍低于其他用户。比如有些用户天生愿意给别人好评,心慈手软,比较好说话,而有的人就比较苛刻,总是评分不超过 3 分(5 分满分)
- 一些物品的评分普遍高于其他物品,一些物品的评分普遍低于其他物品。比如一些物品一被生产便决定了它的地位,有的比较受人们欢迎,有的则被人嫌弃。
这个用户或物品普遍高于或低于平均值的差值,我们称为偏置(bias)
Baseline 目标:
- 找出每个用户普遍高于或低于他人的偏置值
- 找出每件物品普遍高于或低于其他物品的偏置值
- 我们的目标也就转化为寻找最优的
和
使用 Baseline 的算法思想预测评分的步骤如下:
计算所有电影的平均评分
(即全局平均评分) 计算每个用户评分与平均评分
的偏置值 计算每部电影所接受的评分与平均评分
的偏置值 预测用户对电影的评分:
举例:
比如我们想通过 Baseline 来预测用户 A 对电影“阿甘正传”的评分,那么首先计算出整个评分数据集的平均评分
对于所有电影的平均评分

加入 L2 正则化:
公式解析:
- 公式第一部分
是用来寻找与已知评分数据拟合最好的 和 - 公式第二部分
是正则化项,用于避免过拟合现象
对于最小过程的求解,我们一般采用 随机梯度下降法 或者 交替最小二乘法 来优化实现。
方法一:随机梯度下降法优化
使用随机梯度下降优化算法预测 Baseline 偏置值
step 1:梯度下降法推导
损失函数:
梯度下降参数更新原始公式:
梯度下降更新
损失函数偏导推导:
同理可得,梯度下降更新
step 2:随机梯度下降
由于 随机梯度下降法 本质上利用 每个样本的损失 来更新参数,而不用每次求出全部的损失和,因此使用 SGD 时:
单样本损失值:
参数更新:
step 3:算法实现
Click me to view the code
---python import pandas as pd import numpy as np
class BaselineCFBySGD(object):
def __init__(self, number_epochs, alpha, reg, columns = ["uid", "iid", "rating"]):
# 梯度下降最高迭代次数
self.number_epochs = number_epochs
# 学习率
self.alpha = alpha
# 正则参数
self.reg = reg
# 数据集中 user-item-rating 字段的名称
self.columns = columns
def fit(self, dataset):
'''
: param dataset: uid, iid, rating
: return:
'''
self.dataset = dataset
# 用户评分数据
self.users_ratings = dataset.groupby(self.columns [0]).agg([list])[[self.columns[1], self.columns [2]]]
# 物品评分数据
self.items_ratings = dataset.groupby(self.columns [1]).agg([list])[[self.columns[0], self.columns [2]]]
# 计算全局平均分
self.global_mean = self.dataset [self.columns[2]].mean()
# 调用 sgd 方法训练模型参数
self.bu, self.bi = self.sgd()
def sgd(self):
'''
利用随机梯度下降,优化 bu,bi 的值
: return: bu, bi
'''
# 初始化 bu、bi 的值,全部设为 0
bu = dict(zip(self.users_ratings.index, np.zeros(len(self.users_ratings))))
bi = dict(zip(self.items_ratings.index, np.zeros(len(self.items_ratings))))
for i in range(self.number_epochs):
print("iter%d" % i)
for uid, iid, real_rating in self.dataset.itertuples(index = False):
error = real_rating - (self.global_mean + bu [uid] + bi [iid])
bu [uid] += self.alpha * (error - self.reg * bu [uid])
bi [iid] += self.alpha * (error - self.reg * bi [iid])
return bu, bi
def predict(self, uid, iid):
predict_rating = self.global_mean + self.bu [uid] + self.bi [iid]
return predict_rating
if name == 'main': dtype = [("userId", np.int32), ("movieId", np.int32), ("rating", np.float32)] dataset = pd.read_csv("datasets/ml-latest-small/ratings.csv", usecols = range(3), dtype = dict(dtype))
bcf = BaselineCFBySGD(20, 0.1, 0.1, ["userId", "movieId", "rating"])
bcf.fit(dataset)
while True:
uid = int(input("uid: "))
iid = int(input("iid: "))
print(bcf.predict(uid, iid))
Step 4: 准确性指标评估
- 添加 test 方法,然后使用之前实现 accuary 方法计算准确性指标
Click me to view the code
---python import pandas as pd import numpy as np
def data_split(data_path, x=0.8, random=False): ''' 切分数据集, 这里为了保证用户数量保持不变,将每个用户的评分数据按比例进行拆分 :param data_path: 数据集路径 :param x: 训练集的比例,如x=0.8,则0.2是测试集 :param random: 是否随机切分,默认False :return: 用户-物品评分矩阵 ''' print("开始切分数据集...") # 设置要加载的数据字段的类型 dtype = {"userId": np.int32, "movieId": np.int32, "rating": np.float32} # 加载数据,我们只用前三列数据,分别是用户ID,电影ID,已经用户对电影的对应评分 ratings = pd.read_csv(data_path, dtype=dtype, usecols=range(3))
testset_index = []
# 为了保证每个用户在测试集和训练集都有数据,因此按userId聚合
for uid in ratings.groupby("userId").any().index:
user_rating_data = ratings.where(ratings["userId"]==uid).dropna()
if random:
# 因为不可变类型不能被 shuffle方法作用,所以需要强行转换为列表
index = list(user_rating_data.index)
np.random.shuffle(index) # 打乱列表
_index = round(len(user_rating_data) * x)
testset_index += list(index[_index:])
else:
# 将每个用户的x比例的数据作为训练集,剩余的作为测试集
index = round(len(user_rating_data) * x)
testset_index += list(user_rating_data.index.values[index:])
testset = ratings.loc[testset_index]
trainset = ratings.drop(testset_index)
print("完成数据集切分...")
return trainset, testset
def accuray(predict_results, method="all"): ''' 准确性指标计算方法 :param predict_results: 预测结果,类型为容器,每个元素是一个包含uid,iid,real_rating,pred_rating的序列 :param method: 指标方法,类型为字符串,rmse或mae,否则返回两者rmse和mae :return: '''
def rmse(predict_results):
'''
rmse评估指标
:param predict_results:
:return: rmse
'''
length = 0
_rmse_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_rmse_sum += (pred_rating - real_rating) ** 2
return round(np.sqrt(_rmse_sum / length), 4)
def mae(predict_results):
'''
mae评估指标
:param predict_results:
:return: mae
'''
length = 0
_mae_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_mae_sum += abs(pred_rating - real_rating)
return round(_mae_sum / length, 4)
def rmse_mae(predict_results):
'''
rmse和mae评估指标
:param predict_results:
:return: rmse, mae
'''
length = 0
_rmse_sum = 0
_mae_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_rmse_sum += (pred_rating - real_rating) ** 2
_mae_sum += abs(pred_rating - real_rating)
return round(np.sqrt(_rmse_sum / length), 4), round(_mae_sum / length, 4)
if method.lower() == "rmse":
rmse(predict_results)
elif method.lower() == "mae":
mae(predict_results)
else:
return rmse_mae(predict_results)
class BaselineCFBySGD(object):
def __init__(self, number_epochs, alpha, reg, columns=["uid", "iid", "rating"]):
# 梯度下降最高迭代次数
self.number_epochs = number_epochs
# 学习率
self.alpha = alpha
# 正则参数
self.reg = reg
# 数据集中user-item-rating字段的名称
self.columns = columns
def fit(self, dataset):
'''
:param dataset: uid, iid, rating
:return:
'''
self.dataset = dataset
# 用户评分数据
self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]
# 物品评分数据
self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]
# 计算全局平均分
self.global_mean = self.dataset[self.columns[2]].mean()
# 调用sgd方法训练模型参数
self.bu, self.bi = self.sgd()
def sgd(self):
'''
利用随机梯度下降,优化bu,bi的值
:return: bu, bi
'''
# 初始化bu、bi的值,全部设为0
bu = dict(zip(self.users_ratings.index, np.zeros(len(self.users_ratings))))
bi = dict(zip(self.items_ratings.index, np.zeros(len(self.items_ratings))))
for i in range(self.number_epochs):
print("iter%d" % i)
for uid, iid, real_rating in self.dataset.itertuples(index=False):
error = real_rating - (self.global_mean + bu[uid] + bi[iid])
bu[uid] += self.alpha * (error - self.reg * bu[uid])
bi[iid] += self.alpha * (error - self.reg * bi[iid])
return bu, bi
def predict(self, uid, iid):
'''评分预测'''
if iid not in self.items_ratings.index:
raise Exception("无法预测用户<{uid}>对电影<{iid}>的评分,因为训练集中缺失<{iid}>的数据".format(uid=uid, iid=iid))
predict_rating = self.global_mean + self.bu[uid] + self.bi[iid]
return predict_rating
def test(self,testset):
'''预测测试集数据'''
for uid, iid, real_rating in testset.itertuples(index=False):
try:
pred_rating = self.predict(uid, iid)
except Exception as e:
print(e)
else:
yield uid, iid, real_rating, pred_rating
if name == 'main':
trainset, testset = data_split("datasets/ml-latest-small/ratings.csv", random=True)
bcf = BaselineCFBySGD(20, 0.1, 0.1, ["userId", "movieId", "rating"])
bcf.fit(trainset)
pred_results = bcf.test(testset)
rmse, mae = accuray(pred_results)
print("rmse: ", rmse, "mae: ", mae)
方法二:交替最小二乘法优化
使用交替最小二乘法优化算法预测Baseline偏置值
step 1: 交替最小二乘法推导
最小二乘法和梯度下降法一样,可以用于求极值。
最小二乘法思想:对损失函数求偏导,然后再使偏导为0
同样,损失函数:
对损失函数求偏导:
令偏导为0,则可得:
为了简化公式,这里令
其中
同理可得:
其中
step 2: 交替最小二乘法应用
通过最小二乘推导,我们最终分别得到了
- 计算其中一项,先固定其他未知参数,即看作其他未知参数为已知
- 如求
时,将 看作是已知;求 时,将 看作是已知;如此反复交替,不断更新二者的值,求得最终的结果。这就是交替最小二乘法(ALS)
step 3: 算法实现
Click me to view the code
import pandas as pd
import numpy as np
class BaselineCFByALS(object):
def __init__(self, number_epochs, reg_bu, reg_bi, columns=["uid", "iid", "rating"]):
# 梯度下降最高迭代次数
self.number_epochs = number_epochs
# bu的正则参数
self.reg_bu = reg_bu
# bi的正则参数
self.reg_bi = reg_bi
# 数据集中user-item-rating字段的名称
self.columns = columns
def fit(self, dataset):
'''
:param dataset: uid, iid, rating
:return:
'''
self.dataset = dataset
# 用户评分数据
self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]
# 物品评分数据
self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]
# 计算全局平均分
self.global_mean = self.dataset[self.columns[2]].mean()
# 调用sgd方法训练模型参数
self.bu, self.bi = self.als()
def als(self):
'''
利用随机梯度下降,优化bu,bi的值
:return: bu, bi
'''
# 初始化bu、bi的值,全部设为0
bu = dict(zip(self.users_ratings.index, np.zeros(len(self.users_ratings))))
bi = dict(zip(self.items_ratings.index, np.zeros(len(self.items_ratings))))
for i in range(self.number_epochs):
print("iter%d" % i)
for iid, uids, ratings in self.items_ratings.itertuples(index=True):
_sum = 0
for uid, rating in zip(uids, ratings):
_sum += rating - self.global_mean - bu[uid]
bi[iid] = _sum / (self.reg_bi + len(uids))
for uid, iids, ratings in self.users_ratings.itertuples(index=True):
_sum = 0
for iid, rating in zip(iids, ratings):
_sum += rating - self.global_mean - bi[iid]
bu[uid] = _sum / (self.reg_bu + len(iids))
return bu, bi
def predict(self, uid, iid):
predict_rating = self.global_mean + self.bu[uid] + self.bi[iid]
return predict_rating
if __name__ == '__main__':
dtype = [("userId", np.int32), ("movieId", np.int32), ("rating", np.float32)]
dataset = pd.read_csv("datasets/ml-latest-small/ratings.csv", usecols=range(3), dtype=dict(dtype))
bcf = BaselineCFByALS(20, 25, 15, ["userId", "movieId", "rating"])
bcf.fit(dataset)
while True:
uid = int(input("uid: "))
iid = int(input("iid: "))
print(bcf.predict(uid, iid))import pandas as pd
import numpy as np
class BaselineCFByALS(object):
def __init__(self, number_epochs, reg_bu, reg_bi, columns=["uid", "iid", "rating"]):
# 梯度下降最高迭代次数
self.number_epochs = number_epochs
# bu的正则参数
self.reg_bu = reg_bu
# bi的正则参数
self.reg_bi = reg_bi
# 数据集中user-item-rating字段的名称
self.columns = columns
def fit(self, dataset):
'''
:param dataset: uid, iid, rating
:return:
'''
self.dataset = dataset
# 用户评分数据
self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]
# 物品评分数据
self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]
# 计算全局平均分
self.global_mean = self.dataset[self.columns[2]].mean()
# 调用sgd方法训练模型参数
self.bu, self.bi = self.als()
def als(self):
'''
利用随机梯度下降,优化bu,bi的值
:return: bu, bi
'''
# 初始化bu、bi的值,全部设为0
bu = dict(zip(self.users_ratings.index, np.zeros(len(self.users_ratings))))
bi = dict(zip(self.items_ratings.index, np.zeros(len(self.items_ratings))))
for i in range(self.number_epochs):
print("iter%d" % i)
for iid, uids, ratings in self.items_ratings.itertuples(index=True):
_sum = 0
for uid, rating in zip(uids, ratings):
_sum += rating - self.global_mean - bu[uid]
bi[iid] = _sum / (self.reg_bi + len(uids))
for uid, iids, ratings in self.users_ratings.itertuples(index=True):
_sum = 0
for iid, rating in zip(iids, ratings):
_sum += rating - self.global_mean - bi[iid]
bu[uid] = _sum / (self.reg_bu + len(iids))
return bu, bi
def predict(self, uid, iid):
predict_rating = self.global_mean + self.bu[uid] + self.bi[iid]
return predict_rating
if __name__ == '__main__':
dtype = [("userId", np.int32), ("movieId", np.int32), ("rating", np.float32)]
dataset = pd.read_csv("datasets/ml-latest-small/ratings.csv", usecols=range(3), dtype=dict(dtype))
bcf = BaselineCFByALS(20, 25, 15, ["userId", "movieId", "rating"])
bcf.fit(dataset)
while True:
uid = int(input("uid: "))
iid = int(input("iid: "))
print(bcf.predict(uid, iid))Step 4: 准确性指标评估
Click me to view the code
import pandas as pd
import numpy as np
def data_split(data_path, x=0.8, random=False):
'''
切分数据集, 这里为了保证用户数量保持不变,将每个用户的评分数据按比例进行拆分
:param data_path: 数据集路径
:param x: 训练集的比例,如x=0.8,则0.2是测试集
:param random: 是否随机切分,默认False
:return: 用户-物品评分矩阵
'''
print("开始切分数据集...")
# 设置要加载的数据字段的类型
dtype = {"userId": np.int32, "movieId": np.int32, "rating": np.float32}
# 加载数据,我们只用前三列数据,分别是用户ID,电影ID,已经用户对电影的对应评分
ratings = pd.read_csv(data_path, dtype=dtype, usecols=range(3))
testset_index = []
# 为了保证每个用户在测试集和训练集都有数据,因此按userId聚合
for uid in ratings.groupby("userId").any().index:
user_rating_data = ratings.where(ratings["userId"]==uid).dropna()
if random:
# 因为不可变类型不能被 shuffle方法作用,所以需要强行转换为列表
index = list(user_rating_data.index)
np.random.shuffle(index) # 打乱列表
_index = round(len(user_rating_data) * x)
testset_index += list(index[_index:])
else:
# 将每个用户的x比例的数据作为训练集,剩余的作为测试集
index = round(len(user_rating_data) * x)
testset_index += list(user_rating_data.index.values[index:])
testset = ratings.loc[testset_index]
trainset = ratings.drop(testset_index)
print("完成数据集切分...")
return trainset, testset
def accuray(predict_results, method="all"):
'''
准确性指标计算方法
:param predict_results: 预测结果,类型为容器,每个元素是一个包含uid,iid,real_rating,pred_rating的序列
:param method: 指标方法,类型为字符串,rmse或mae,否则返回两者rmse和mae
:return:
'''
def rmse(predict_results):
'''
rmse评估指标
:param predict_results:
:return: rmse
'''
length = 0
_rmse_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_rmse_sum += (pred_rating - real_rating) ** 2
return round(np.sqrt(_rmse_sum / length), 4)
def mae(predict_results):
'''
mae评估指标
:param predict_results:
:return: mae
'''
length = 0
_mae_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_mae_sum += abs(pred_rating - real_rating)
return round(_mae_sum / length, 4)
def rmse_mae(predict_results):
'''
rmse和mae评估指标
:param predict_results:
:return: rmse, mae
'''
length = 0
_rmse_sum = 0
_mae_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_rmse_sum += (pred_rating - real_rating) ** 2
_mae_sum += abs(pred_rating - real_rating)
return round(np.sqrt(_rmse_sum / length), 4), round(_mae_sum / length, 4)
if method.lower() == "rmse":
rmse(predict_results)
elif method.lower() == "mae":
mae(predict_results)
else:
return rmse_mae(predict_results)
class BaselineCFByALS(object):
def __init__(self, number_epochs, reg_bu, reg_bi, columns=["uid", "iid", "rating"]):
# 梯度下降最高迭代次数
self.number_epochs = number_epochs
# bu的正则参数
self.reg_bu = reg_bu
# bi的正则参数
self.reg_bi = reg_bi
# 数据集中user-item-rating字段的名称
self.columns = columns
def fit(self, dataset):
'''
:param dataset: uid, iid, rating
:return:
'''
self.dataset = dataset
# 用户评分数据
self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]
# 物品评分数据
self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]
# 计算全局平均分
self.global_mean = self.dataset[self.columns[2]].mean()
# 调用sgd方法训练模型参数
self.bu, self.bi = self.als()
def als(self):
'''
利用随机梯度下降,优化bu,bi的值
:return: bu, bi
'''
# 初始化bu、bi的值,全部设为0
bu = dict(zip(self.users_ratings.index, np.zeros(len(self.users_ratings))))
bi = dict(zip(self.items_ratings.index, np.zeros(len(self.items_ratings))))
for i in range(self.number_epochs):
print("iter%d" % i)
for iid, uids, ratings in self.items_ratings.itertuples(index=True):
_sum = 0
for uid, rating in zip(uids, ratings):
_sum += rating - self.global_mean - bu[uid]
bi[iid] = _sum / (self.reg_bi + len(uids))
for uid, iids, ratings in self.users_ratings.itertuples(index=True):
_sum = 0
for iid, rating in zip(iids, ratings):
_sum += rating - self.global_mean - bi[iid]
bu[uid] = _sum / (self.reg_bu + len(iids))
return bu, bi
def predict(self, uid, iid):
'''评分预测'''
if iid not in self.items_ratings.index:
raise Exception("无法预测用户<{uid}>对电影<{iid}>的评分,因为训练集中缺失<{iid}>的数据".format(uid=uid, iid=iid))
predict_rating = self.global_mean + self.bu[uid] + self.bi[iid]
return predict_rating
def test(self,testset):
'''预测测试集数据'''
for uid, iid, real_rating in testset.itertuples(index=False):
try:
pred_rating = self.predict(uid, iid)
except Exception as e:
print(e)
else:
yield uid, iid, real_rating, pred_rating
if __name__ == '__main__':
trainset, testset = data_split("datasets/ml-latest-small/ratings.csv", random=True)
bcf = BaselineCFByALS(20, 25, 15, ["userId", "movieId", "rating"])
bcf.fit(trainset)
pred_results = bcf.test(testset)
rmse, mae = accuray(pred_results)
print("rmse: ", rmse, "mae: ", mae)import pandas as pd
import numpy as np
def data_split(data_path, x=0.8, random=False):
'''
切分数据集, 这里为了保证用户数量保持不变,将每个用户的评分数据按比例进行拆分
:param data_path: 数据集路径
:param x: 训练集的比例,如x=0.8,则0.2是测试集
:param random: 是否随机切分,默认False
:return: 用户-物品评分矩阵
'''
print("开始切分数据集...")
# 设置要加载的数据字段的类型
dtype = {"userId": np.int32, "movieId": np.int32, "rating": np.float32}
# 加载数据,我们只用前三列数据,分别是用户ID,电影ID,已经用户对电影的对应评分
ratings = pd.read_csv(data_path, dtype=dtype, usecols=range(3))
testset_index = []
# 为了保证每个用户在测试集和训练集都有数据,因此按userId聚合
for uid in ratings.groupby("userId").any().index:
user_rating_data = ratings.where(ratings["userId"]==uid).dropna()
if random:
# 因为不可变类型不能被 shuffle方法作用,所以需要强行转换为列表
index = list(user_rating_data.index)
np.random.shuffle(index) # 打乱列表
_index = round(len(user_rating_data) * x)
testset_index += list(index[_index:])
else:
# 将每个用户的x比例的数据作为训练集,剩余的作为测试集
index = round(len(user_rating_data) * x)
testset_index += list(user_rating_data.index.values[index:])
testset = ratings.loc[testset_index]
trainset = ratings.drop(testset_index)
print("完成数据集切分...")
return trainset, testset
def accuray(predict_results, method="all"):
'''
准确性指标计算方法
:param predict_results: 预测结果,类型为容器,每个元素是一个包含uid,iid,real_rating,pred_rating的序列
:param method: 指标方法,类型为字符串,rmse或mae,否则返回两者rmse和mae
:return:
'''
def rmse(predict_results):
'''
rmse评估指标
:param predict_results:
:return: rmse
'''
length = 0
_rmse_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_rmse_sum += (pred_rating - real_rating) ** 2
return round(np.sqrt(_rmse_sum / length), 4)
def mae(predict_results):
'''
mae评估指标
:param predict_results:
:return: mae
'''
length = 0
_mae_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_mae_sum += abs(pred_rating - real_rating)
return round(_mae_sum / length, 4)
def rmse_mae(predict_results):
'''
rmse和mae评估指标
:param predict_results:
:return: rmse, mae
'''
length = 0
_rmse_sum = 0
_mae_sum = 0
for uid, iid, real_rating, pred_rating in predict_results:
length += 1
_rmse_sum += (pred_rating - real_rating) ** 2
_mae_sum += abs(pred_rating - real_rating)
return round(np.sqrt(_rmse_sum / length), 4), round(_mae_sum / length, 4)
if method.lower() == "rmse":
rmse(predict_results)
elif method.lower() == "mae":
mae(predict_results)
else:
return rmse_mae(predict_results)
class BaselineCFByALS(object):
def __init__(self, number_epochs, reg_bu, reg_bi, columns=["uid", "iid", "rating"]):
# 梯度下降最高迭代次数
self.number_epochs = number_epochs
# bu的正则参数
self.reg_bu = reg_bu
# bi的正则参数
self.reg_bi = reg_bi
# 数据集中user-item-rating字段的名称
self.columns = columns
def fit(self, dataset):
'''
:param dataset: uid, iid, rating
:return:
'''
self.dataset = dataset
# 用户评分数据
self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]
# 物品评分数据
self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]
# 计算全局平均分
self.global_mean = self.dataset[self.columns[2]].mean()
# 调用sgd方法训练模型参数
self.bu, self.bi = self.als()
def als(self):
'''
利用随机梯度下降,优化bu,bi的值
:return: bu, bi
'''
# 初始化bu、bi的值,全部设为0
bu = dict(zip(self.users_ratings.index, np.zeros(len(self.users_ratings))))
bi = dict(zip(self.items_ratings.index, np.zeros(len(self.items_ratings))))
for i in range(self.number_epochs):
print("iter%d" % i)
for iid, uids, ratings in self.items_ratings.itertuples(index=True):
_sum = 0
for uid, rating in zip(uids, ratings):
_sum += rating - self.global_mean - bu[uid]
bi[iid] = _sum / (self.reg_bi + len(uids))
for uid, iids, ratings in self.users_ratings.itertuples(index=True):
_sum = 0
for iid, rating in zip(iids, ratings):
_sum += rating - self.global_mean - bi[iid]
bu[uid] = _sum / (self.reg_bu + len(iids))
return bu, bi
def predict(self, uid, iid):
'''评分预测'''
if iid not in self.items_ratings.index:
raise Exception("无法预测用户<{uid}>对电影<{iid}>的评分,因为训练集中缺失<{iid}>的数据".format(uid=uid, iid=iid))
predict_rating = self.global_mean + self.bu[uid] + self.bi[iid]
return predict_rating
def test(self,testset):
'''预测测试集数据'''
for uid, iid, real_rating in testset.itertuples(index=False):
try:
pred_rating = self.predict(uid, iid)
except Exception as e:
print(e)
else:
yield uid, iid, real_rating, pred_rating
if __name__ == '__main__':
trainset, testset = data_split("datasets/ml-latest-small/ratings.csv", random=True)
bcf = BaselineCFByALS(20, 25, 15, ["userId", "movieId", "rating"])
bcf.fit(trainset)
pred_results = bcf.test(testset)
rmse, mae = accuray(pred_results)
print("rmse: ", rmse, "mae: ", mae)