本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.
【推荐算法】协同过滤算法介绍_MachineCYL的博客-CSDN博客
上文介绍了协同过滤算法的原理,接下来我介绍一下协同过滤算法的代码实现。
下面我就开始介绍用pyspark中的ALS(交替最小二乘矩阵分解)来实现协同过滤代码。
一、ALS的简单介绍
ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。 从协同过滤的分类来说, ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User(用户)和Item(商品)两个方面。
用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。
假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵 Rm×n。在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这幺大的数据量已经是很难处理了。另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。而使用ALS可以达到数据降维的目的,大大减少计算量。
二、前期准备
使用pyspark前要先安装spark的环境。我的spark版本是2.4.3,pyspark版本也是2.4.3。
如果需要安装spark环境,可以参考:
pyspark安装指令如下(加清华源,下载快多了):
pip install pyspark==2.4.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
数据格式( 如果需要获取数据,可以见下方链接 ):
模型训练用到的数据格式如下:主要用到userId、movieId和rating字段。
三、详细代码
定义CollaborativeFiltering类,主要封装模型训练、模型保存、用户推荐、商品推荐等代码。
class CollaborativeFiltering(object): def __init__(self, spark_session): self.spark_session = spark_session self.model = None def train(self, train_set, user_col, item_col, rating_col, epoch=10): """ Build the recommendation model using ALS on the training data Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics """ als = ALS(regParam=0.01, maxIter=epoch, userCol=user_col, itemCol=item_col, ratingCol=rating_col, coldStartStrategy='drop') self.model = als.fit(train_set) def eval(self, test_set, label_col='ratingFloat', metric='rmse'): """ Evaluate the model on the test data """ predictions = self.model.transform(test_set) # self.model.itemFactors.show(10, truncate=False) # self.model.userFactors.show(10, truncate=False) evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label_col, metricName=metric) loss = evaluator.evaluate(predictions) return loss def save(self, model_dir="./model_param"): self.model.write().overwrite().save(model_dir) def load(self, model_dir="./model_param"): self.model = ALSModel.load(model_dir) def recommend_for_all_users(self, num_items=10): user_recs = self.model.recommendForAllUsers(numItems=num_items) return user_recs def recommend_for_all_items(self, num_users=10): item_recs = self.model.recommendForAllItems(numUsers=num_users) return item_recs def recommend_for_user_subset(self, dataset, num_items=10): user_recs = self.model.recommendForUserSubset(dataset=dataset, numItems=num_items) return user_recs def recommend_for_item_subset(self, dataset, num_users=10): item_recs = self.model.recommendForItemSubset(dataset=dataset, numUsers=num_users) return item_recs
新建代码文件train_cf.py,加入示例代码如下:
def parse_argvs(): parser = argparse.ArgumentParser(description='[collaborativeFiltering]') parser.add_argument("--data_path", type=str, default='./data/ratings.csv') parser.add_argument("--model_path", type=str, default='./model_param') parser.add_argument("--epoch", type=int, default=10) parser.add_argument("--train_flag", type=bool, default=False) args = parser.parse_args() print('[input params] {}'.format(args)) return parser, args if __name__ == '__main__': parser, args = parse_argvs() data_path = args.data_path model_path = args.model_path train_flag = args.train_flag epoch = args.epoch conf = SparkConf().setAppName('collaborativeFiltering').setMaster("local[*]") spark_session = SparkSession.builder.config(conf=conf).getOrCreate() # read data data_path = os.path.abspath(data_path) data_path = "file://" + data_path print("[spark] read file path: {}".format(data_path)) ratingSamples = spark_session.read.format('csv').option('header', 'true').load(data_path) \ .withColumn("userIdInt", F.col("userId").cast(IntegerType())) \ .withColumn("movieIdInt", F.col("movieId").cast(IntegerType())) \ .withColumn("ratingFloat", F.col("rating").cast(FloatType())) training, test = ratingSamples.randomSplit((0.8, 0.2), seed=2022) # collaborative filtering start cf = CollaborativeFiltering(spark_session=spark_session) if train_flag is True: cf.train(train_set=training, user_col='userIdInt', item_col='movieIdInt', rating_col='ratingFloat', epoch=epoch) cf.save(model_dir=model_path) else: cf.load(model_dir=model_path) loss = cf.eval(test_set=test, label_col='ratingFloat', metric='rmse') print("[Root-mean-square error] {}".format(loss)) # Generate top 10 movie recommendations for each user user_recs = cf.recommend_for_all_users(num_items=10) user_recs.show(10, False) # Generate top 10 user recommendations for each movie movie_recs = cf.recommend_for_all_items(num_users=10) movie_recs.show(10, False) # Generate top 10 movie recommendations for a specified set of users user_data = ratingSamples.select("userIdInt").distinct().limit(10) user_sub_recs = cf.recommend_for_user_subset(dataset=user_data, num_items=10) user_sub_recs.show(10, False) # Generate top 10 user recommendations for a specified set of movies movie_data = ratingSamples.select("movieIdInt").distinct().limit(10) movie_sub_recs = cf.recommend_for_item_subset(dataset=movie_data, num_users=10) movie_sub_recs.show(10, False) spark_session.stop()
运行指令示例:
# 进行ALS模型训练和预测 python train_cf.py --data_path "./data/ratings.csv" --train_flag True # 不训练,直接进行ALS模型预测 python train_cf.py --data_path "./data/ratings.csv"
代码执行结果如下(部分):
需要获取训练数据和代码可以访问我的github,如果觉得有帮助,请star收藏,谢谢~
参考链接
spark中ALS介绍_AuroraPetard的博客-CSDN博客_sparkals
als算法参数_Pyspark推荐算法实战(一)_三杉的博客-CSDN博客
Be First to Comment