Python 人工智能实战:智能推荐
作者:禅与计算机程序设计艺术
1.背景介绍
推荐引擎(recommender system)是一种用于生成产品推荐列表、提升用户体验满意度并提高新增用户转化率的技术体系。根据其核心功能目标的不同类型可将其主要分为若干大类
- 个性化推荐:给个体化需求(如用户兴趣偏好)的用户推荐适合其口味的商品;
- 搜索推荐:通过搜索引擎提供的用户查询词或行为习惯、用户画像、位置信息等,找到对用户可能感兴趣的内容;
- 协同过滤:结合用户之间的互动行为(如购买行为、评价、浏览、收藏等),为用户提供相似兴趣的商品推荐。 推荐系统已经在电子商务、社交网络、视频网站、网游领域发挥着重要作用。近年来,随着人工智能、大数据、云计算等新技术的兴起,基于机器学习的推荐系统越来越受到重视,并得到广泛应用。 人工智能推荐系统的主要研究方向包括:
- 数据挖掘:包括数据清洗、特征抽取、处理、模型训练、效果评估等方面,将海量数据进行分析、整理、归纳和提炼;
- 自然语言处理:包括文本特征提取、文本匹配、文本生成、关键词挖掘等,将用户输入的内容转换成计算机可读的形式;
- 图计算:包括网络构建、节点嵌入、推荐路径构建、边权重计算等,充分利用网络结构及关联性关系进行推荐决策;
- 深度学习:包括深度神经网络(DNN)、递归神经网络(RNN)、卷积神经网络(CNN)、变压器网络(Transformer)等,运用深度学习技术进行高效、准确的推荐预测。 本文将基于Python实现一个简易的人工智能推荐系统——基于UserCF算法的推荐系统。文章的前半部分将介绍推荐系统相关知识、术语和算法原理,后半部分将详细阐述Python实现细节。
2.核心概念与联系
推荐系统由用户的资源构成,并且将其分为三大核心组成部分:即为用户的档案记录、商品的分类信息以及用户的互动数据。
- 用户档案:是指管理并存储反映用户特征及其互动历史的数据集合体。通常包含的基本信息有用户名码(ID)、年龄层别(年龄)、性别分类(男女)、职业领域(行业)、学历层次(学段)、地域分布(地区)、消费倾向(消费模式)以及偏好类型(喜好)等内容。
- 物品库:指的是系统中汇集的各种可被推荐的对象集合体。这些对象主要包括电影作品(电影)、音乐作品(歌曲)、书籍作品(书籍)、商品条目(产品)、活动方案(活动)、工具软件(工具)以及服饰款式(时尚)等多个维度的内容。
- 交互数据集:是指系统中记录的用户与商品之间发生的所有互动行为的历史性文档集合体。“常见”的这种类型的数据集具体包括但不限于点击事件、“收藏”行为、“关注”状态、“评论反馈”、“分享传播”、“收藏夹添加”以及“喜欢标记”等多种形式。“推荐系统”的主要目标就是基于用户的互动数据分析来提供精准的商品推荐。“核心问题在于如何从这三个维度——即用户特征、“商品属性”以及互动关系——中挖掘出潜在的联系规律,并按照预先设定的策略生成具有商业价值的推荐结果。
基于用户的协同过滤算法 UserCF 是一种典型的推荐系统方法。该算法通过分析用户行为数据来识别他们之间的相似性,并在此基础上进行信息匹配与知识发现。它假设用户群体之间存在某些共性特征,并通过比较这些特征来推断用户的偏好趋势。在具体应用中,默认情况下该系统会计算每个用户的评分向量与其邻居用户的余弦相似度得分,并根据这些得分值确定 K 个最接近的对象作为参考依据(其中 R_{ui} 表示用户 u 对物品 i 的评分值)。当两个用户 A 和 B 的兴趣模式高度相同时,则可推断这两者很可能在特定领域具有相同的偏好取向;这种方法是协同过滤领域中最经典的实现方案之一)。
3.核心算法原理和具体操作步骤以及数学模型公式详细讲解
UserCF算法包括以下几个步骤:
-
进行用户的聚类分析:首先将目标群体划分为若干类别,在这些类别中每个体都具有相似的兴趣爱好和行为模式。尽管各用户的兴趣爱好各异,并非所有个体都表现出一致的选择倾向;然而由于群体成员之间的高度相似性,在某些方面它们很可能表现出一致的偏好趋势。例如按照年龄划分人群,则同龄层内的个体往往展现出相似的兴趣爱好;或者从多维度特征入手,则能够更好地构建出具有代表性的分类标准。另一种方法则是通过综合考虑地理位置、职业背景以及消费习惯等因素来实现精准的分类
-
对每位用户的商品评分进行计算:在分析用户的相似性后,在考虑用户的相似性关系和他们的历史行为数据(如点击、购买、评分等)来确定兴趣评分。这一过程可通过收集相关数据获取。一种简便的方法是将所有商品的平均分作为基础;另一种更为精确的方法是采用贝叶斯估计法或其他复杂的数学模型来进行评估。
依据用户的评分数据筛选出高人气商品
更新模型:最终目的是优化模型的参数以反映最新的数据信息。通常情况下,在每隔一段时间时就会对模型参数进行一次更新操作。
4.具体代码实例和详细解释说明
4.1 安装依赖模块
本项目基于Numpy、Pandas、Scikit-learn和Matplotlib这四个库(如Numpy, Pandas, Scikit-learn和Matplotlib),无需额外配置即可通过Anaconda进行安装。
conda install numpy pandas scikit-learn matplotlib
代码解读
也可以通过pip安装:
pip install -r requirements.txt
代码解读
4.2 数据准备
我们项目的实验采用了MovieLens数据集。该数据集是机器学习领域内广为人知的经典推荐系统基准数据集,在该研究中所使用的仅为用户行为记录、物品属性信息以及交互互动的数据集合。该数据集包括用户特征、物品特征以及评分信息三个组成部分。
4.2.1 MovieLens数据集获取
4.2.2 数据加载
加载数据集文件可以使用pandas模块读取。
首先,加载用户档案文件users.dat:
import pandas as pd
from io import StringIO
with open('ml-1m\ users.dat', 'r') as f:
data = f.read()
users = pd.read_csv(StringIO(data), sep='::', header=None, names=['id', 'gender', 'age', 'occupation', 'zip'])
代码解读
第二,加载物品档案文件movies.dat:
with open('ml-1m\ movies.dat', 'r') as f:
data = f.read()
movies = pd.read_csv(StringIO(data), sep='::', header=None, names=['id', 'title', 'genres'])
代码解读
第三,加载交互数据集文件ratings.dat:
with open('ml-1m\ ratings.dat', 'r') as f:
data = f.read()
ratings = pd.read_csv(StringIO(data), sep='::', header=None, names=['user_id','movie_id', 'rating', 'timestamp'])
代码解读
第四,合并用户档案、物品档案和交互数据集:
data = ratings.merge(movies, on='movie_id').merge(users, on='user_id')
代码解读
4.3 数据清洗
数据清洗的目的是使得数据集满足规范要求,方便后续模型训练和测试。
首先,删除缺失值较多的列:
data = data.dropna()
代码解读
然后,将年龄范围内的用户归为统一年龄范围,且不考虑超出范围的用户:
def age_map(x):
if x < 19:
return 'teen'
elif x < 30:
return 'young adult'
elif x < 40:
return 'adult'
else:
return'senior'
data['age'] = data['age'].apply(lambda x: age_map(x))
代码解读
4.4 数据切分
数据切分的作用在于将原始数据划分为训练数据、验证数据以及测试数据三个部分。为了准确评估模型性能,建议采用测试数据作为基准。
使用scikit-learn模块的train_test_split函数进行切分:
from sklearn.model_selection import train_test_split
X = data[['user_id','movie_id']]
y = data['rating']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
代码解读
4.5 使用UserCF算法训练模型
UserCF算法的训练过程实际上就是用户-物品评分矩阵的建立。
首先,定义一个函数用于计算用户之间的相似性:
from scipy.spatial.distance import cosine
def user_similarity(user_ids):
"""Calculate the similarity between two users."""
# select the subset of rating data for these two users
mask = (data['user_id'] == user_ids[0]) | (data['user_id'] == user_ids[1])
user_data = data[mask]
# calculate their average rating vectors and take their dot product
r_i = np.mean(np.array([user_data['rating'][user_data['user_id']==user_ids[0]],
user_data['rating'][user_data['user_id']==user_ids[1]]]), axis=0).reshape(-1, 1)
sim = np.dot(r_i, r_i.T)[0][0] / (np.linalg.norm(r_i)*np.linalg.norm(r_i))
return 1 - sim
代码解读
其次,遍历训练集,统计每一个用户对所有物品的评分,构造评分矩阵:
import numpy as np
n_users = len(set(X_train['user_id']))
n_items = len(set(X_train['movie_id']))
print("Number of users:", n_users)
print("Number of items:", n_items)
item_sims = {} # item similarities dictionary
user_item_scores = np.zeros((n_users, n_items)) # user-item scores matrix
for i in range(len(X_train)):
u_i = X_train.iloc[i]['user_id'] # current user ID
m_j = X_train.iloc[i]['movie_id'] # current movie ID
# update user-item score matrix
r_ij = y_train.iloc[i] # current rating
user_item_scores[u_i-1, m_j-1] += r_ij
# calculate item similarities
item_j = set(filter(lambda x: x!=m_j, list(range(1, n_items+1)))) # all other movies except j
item_j_scores = [] # list of j's scores with each k (k!= j) from U_i
for k in filter(lambda x: x!=u_i, list(set(X_train['user_id']))): # all other users except i
try:
# find common movies rated by both i and k
mk_mask = (X_train['user_id'] == k) & (X_train['movie_id'].isin(list(item_j)))
mk = set(X_train[mk_mask].iloc[:,1])
# compute correlation coefficient between i and k's rating vectors
ri = np.mean(data[(data['user_id']==u_i)]['rating']).reshape((-1,))
rk = np.mean(data[(data['user_id']==k)]['rating']).reshape((-1,))
rjk = np.mean(data[(data['user_id']==k) & (data['movie_id'].isin(list(mk))) ]['rating']).reshape((-1,))
rho = np.corrcoef(ri, rjk)[0][1]
# add to list of j's scores
item_j_scores.append((rho, k))
except Exception as e:
print("Error", e)
# sort list of j's scores based on correlation coefficient
item_j_scores = sorted(item_j_scores, key=lambda x: abs(x[0]), reverse=True)[:min(len(item_j), 10)]
# update item similarities dictionary
item_sims[m_j] = [(j, user_similarity([u_i, j])) for j,_ in item_j_scores]
代码解读
最后,训练模型,利用用户-物品评分矩阵进行预测:
import heapq
class UserBasedCF():
def __init__(self, user_item_scores, item_sims):
self.user_item_scores = user_item_scores
self.item_sims = item_sims
def predict(self, user_id, item_id, top_n=10):
max_similarities = [1]*top_n # initialize maximum similarities
# iterate through every neighbor and calculate its predicted rating
neighbors = self.find_neighbors(user_id)
pred_ratings = [self.calculate_predicted_rating(user_id, neighbor[0], item_id) for neighbor in neighbors]
# rank them based on the predicted rating and return the corresponding IDs
res = heapq.nlargest(top_n, zip(pred_ratings, neighbors))
return res
def find_neighbors(self, user_id):
"""Find a given user's nearest neighbors based on their item preferences."""
neighbors = []
for j in range(self.user_item_scores.shape[1]):
item_prefs = self.user_item_scores[user_id-1,:]
# find items that have high correlation with this one
j_sims = [(j, s) for j,s in self.item_sims[j+1] if s > 0.75]
# adjust preference value using correlations
adjusted_prefs = [p*w for p, (_, w) in zip(item_prefs, j_sims)]
# normalize adjusted preferences and append neighbor tuple (ID, preference)
norm = sum(adjusted_prefs)
pref_vec = [pref/norm for pref in adjusted_prefs]
neighbors.append((j+1, pref_vec))
# choose top N neighboring users based on their predicted ratings
max_ratings = [-sum(score_vec) for _, score_vec in neighbors]
max_neighbors = heapq.nlargest(len(max_ratings), enumerate(max_ratings), key=lambda x:x[1])
return [(int(neigh[0]+1), round(self.predict_rating(user_id, int(neigh[0]+1)), 3))
for neigh in max_neighbors]
def calculate_predicted_rating(self, user_id, neighbor_id, item_id):
"""Calculate the predicted rating of an item for a given user based on his/her neighbors' ratings."""
# get neighbor's previous ratings for all items
prev_ratings = self.user_item_scores[neighbor_id-1, :]
# calculate weights based on similarity coefficients
sim_coeffs = [weight for id_, weight in self.item_sims[item_id] if id_==neighbor_id]
if not sim_coeffs: # no similarity found
return None
weights = [coeff * prev_rating for coeff, prev_rating in zip(sim_coeffs, prev_ratings)]
total_weight = sum(weights)
# calculate predicted rating based on weighted sums
weighted_sums = [prev_rating*weight/total_weight for prev_rating, weight in zip(prev_ratings, weights)]
pred_rating = sum(weighted_sums) + (self.get_mean_rating(user_id)-self.get_mean_rating(neighbor_id))*0.5
return pred_rating
def predict_rating(self, user_id, neighbor_id):
"""Predict the rating of a particular user's favorite item based on another user's ratings."""
hist_data = data[(data['user_id']==user_id)].sort_values(['timestamp'], ascending=False)
if len(hist_data)==0 or len(data[(data['user_id']==neighbor_id)])==0:
return 3.5 # default rating for new or infrequent user-item pairs
recent_item_id = hist_data.iloc[0]['movie_id']
recent_rating = float(hist_data.iloc[0]['rating'])
item_ids = set(data[(data['user_id']==user_id)]['movie_id'])
match_rating = 3.5 # default rating for unknown item
# check whether neighbor has recently interacted with any known items
recency_factor = 0
for item_id in item_ids:
if float(data[(data['user_id']==neighbor_id) & (data['movie_id']==item_id)]['rating']):
break
recency_factor -= 1/(len(item_ids)+recency_factor)
if recent_rating <= 3.5: # use history only when recent rating is valid
item_scores = self.user_item_scores[user_id-1,:]
match_rating = item_scores[recent_item_id-1]
pred_rating = ((match_rating*(1-recency_factor)) +
(self.get_mean_rating(user_id)*(recency_factor)))
return pred_rating
def get_mean_rating(self, user_id):
mean_rating = np.nanmean(data[data['user_id']==user_id]['rating'])
return mean_rating if isinstance(mean_rating, float) else 3.5
ubcf = UserBasedCF(user_item_scores, item_sims)
代码解读
4.6 模型效果评估
模型的效果可依据均方根误差值、平均绝对误差值、相关系数、覆盖率、查准率、召回率以及F1值等相关的指标进行评估和判断。
使用scikit-learn模块的metrics模块计算模型效果:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
# evaluate model performance on test set
y_pred = ubcf.predict(X_test['user_id'].values, X_test['movie_id'].values)
rmse = mean_squared_error(y_test, y_pred)**0.5
mae = mean_absolute_error(y_test, y_pred)
rsquared = r2_score(y_test, y_pred)
print("RMSE:", rmse)
print("MAE:", mae)
print("R-squared:", rsquared)
代码解读
