Press "Enter" to skip to content

【推荐算法】协同过滤算法代码(pyspark | ALS)

【推荐算法】协同过滤算法介绍_MachineCYL的博客-CSDN博客

 

上文介绍了协同过滤算法的原理,接下来我介绍一下协同过滤算法的代码实现。

 

下面我就开始介绍用pyspark中的ALS(交替最小二乘矩阵分解)来实现协同过滤代码。

 

一、ALS的简单介绍

 

ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。 从协同过滤的分类来说, ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User(用户)和Item(商品)两个方面。

 

用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。

 

假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵 Rm×n。在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这幺大的数据量已经是很难处理了。另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。而使用ALS可以达到数据降维的目的,大大减少计算量。

 

二、前期准备

使用pyspark前要先安装spark的环境。我的spark版本是2.4.3,pyspark版本也是2.4.3。
如果需要安装spark环境,可以参考:

Spark的安装及配置 – 简书

pyspark安装指令如下(加清华源,下载快多了):

pip install pyspark==2.4.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

 

数据格式( 如果需要获取数据,可以见下方链接 ):

 

模型训练用到的数据格式如下:主要用到userId、movieId和rating字段。

 

 

三、详细代码

 

定义CollaborativeFiltering类,主要封装模型训练、模型保存、用户推荐、商品推荐等代码。

 

class CollaborativeFiltering(object):
    def __init__(self, spark_session):
        self.spark_session = spark_session
        self.model = None
    def train(self, train_set, user_col, item_col, rating_col, epoch=10):
        """
        Build the recommendation model using ALS on the training data
        Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
        """
        als = ALS(regParam=0.01, maxIter=epoch, userCol=user_col, itemCol=item_col, ratingCol=rating_col,
                  coldStartStrategy='drop')
        self.model = als.fit(train_set)
    def eval(self, test_set, label_col='ratingFloat', metric='rmse'):
        """ Evaluate the model on the test data """
        predictions = self.model.transform(test_set)
        # self.model.itemFactors.show(10, truncate=False)
        # self.model.userFactors.show(10, truncate=False)
        evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label_col, metricName=metric)
        loss = evaluator.evaluate(predictions)
        return loss
    def save(self, model_dir="./model_param"):
        self.model.write().overwrite().save(model_dir)
    def load(self, model_dir="./model_param"):
        self.model = ALSModel.load(model_dir)
    def recommend_for_all_users(self, num_items=10):
        user_recs = self.model.recommendForAllUsers(numItems=num_items)
        return user_recs
    def recommend_for_all_items(self, num_users=10):
        item_recs = self.model.recommendForAllItems(numUsers=num_users)
        return item_recs
    def recommend_for_user_subset(self, dataset, num_items=10):
        user_recs = self.model.recommendForUserSubset(dataset=dataset, numItems=num_items)
        return user_recs
    def recommend_for_item_subset(self, dataset, num_users=10):
        item_recs = self.model.recommendForItemSubset(dataset=dataset, numUsers=num_users)
        return item_recs

 

新建代码文件train_cf.py,加入示例代码如下:

 

def parse_argvs():
    parser = argparse.ArgumentParser(description='[collaborativeFiltering]')
    parser.add_argument("--data_path", type=str, default='./data/ratings.csv')
    parser.add_argument("--model_path", type=str, default='./model_param')
    parser.add_argument("--epoch", type=int, default=10)
    parser.add_argument("--train_flag", type=bool, default=False)
    args = parser.parse_args()
    print('[input params] {}'.format(args))
    return parser, args
if __name__ == '__main__':
    parser, args = parse_argvs()
    data_path = args.data_path
    model_path = args.model_path
    train_flag = args.train_flag
    epoch = args.epoch
    conf = SparkConf().setAppName('collaborativeFiltering').setMaster("local[*]")
    spark_session = SparkSession.builder.config(conf=conf).getOrCreate()
    # read data
    data_path = os.path.abspath(data_path)
    data_path = "file://" + data_path
    print("[spark] read file path: {}".format(data_path))
    ratingSamples = spark_session.read.format('csv').option('header', 'true').load(data_path) \
        .withColumn("userIdInt", F.col("userId").cast(IntegerType())) \
        .withColumn("movieIdInt", F.col("movieId").cast(IntegerType())) \
        .withColumn("ratingFloat", F.col("rating").cast(FloatType()))
    training, test = ratingSamples.randomSplit((0.8, 0.2), seed=2022)
    # collaborative filtering start
    cf = CollaborativeFiltering(spark_session=spark_session)
    if train_flag is True:
        cf.train(train_set=training,
                 user_col='userIdInt',
                 item_col='movieIdInt',
                 rating_col='ratingFloat',
                 epoch=epoch)
        cf.save(model_dir=model_path)
    else:
        cf.load(model_dir=model_path)
    loss = cf.eval(test_set=test, label_col='ratingFloat', metric='rmse')
    print("[Root-mean-square error] {}".format(loss))
    # Generate top 10 movie recommendations for each user
    user_recs = cf.recommend_for_all_users(num_items=10)
    user_recs.show(10, False)
    # Generate top 10 user recommendations for each movie
    movie_recs = cf.recommend_for_all_items(num_users=10)
    movie_recs.show(10, False)
    # Generate top 10 movie recommendations for a specified set of users
    user_data = ratingSamples.select("userIdInt").distinct().limit(10)
    user_sub_recs = cf.recommend_for_user_subset(dataset=user_data, num_items=10)
    user_sub_recs.show(10, False)
    # Generate top 10 user recommendations for a specified set of movies
    movie_data = ratingSamples.select("movieIdInt").distinct().limit(10)
    movie_sub_recs = cf.recommend_for_item_subset(dataset=movie_data, num_users=10)
    movie_sub_recs.show(10, False)
    spark_session.stop()

 

运行指令示例:

 

# 进行ALS模型训练和预测
python train_cf.py  --data_path "./data/ratings.csv" --train_flag True
# 不训练,直接进行ALS模型预测
python train_cf.py  --data_path "./data/ratings.csv"

 

代码执行结果如下(部分):

 

 

需要获取训练数据和代码可以访问我的github,如果觉得有帮助,请star收藏,谢谢~

 

CollaborativeFiltering

 

参考链接

spark中ALS介绍_AuroraPetard的博客-CSDN博客_sparkals
als算法参数_Pyspark推荐算法实战(一)_三杉的博客-CSDN博客

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注