基于DNN的CTR预测模型基本包含四个步骤:
- Input
- Embedding
- Low-order&High-order Feature Extractor
- Prediction
特征处理
标签编码
一般有两种方法对特征进行编码:
- 映射:即将特征映射到
0~len(#unique)-1
for feat in sparse_features: lbe = LabelEncoder() data[feat] = lbe.fit_transform(data[feat])
- 哈希编码:将特征映射到固定范围,例如
0~9999
或# 在训练前 for feat in sparse_features: lbe = HashEncoder() data[feat] = lbe.transform(data[feat])
# 在训练过程中动态执行特征哈希 # 在SparseFeat或VarlenSparseFeat中将use_hash设为True
对于密集的数值特征,通常将其离散为桶,可以使用归一化处理:
mms = MinMaxScaler(feature_range=(0,1))
data[dense_features] = mms.fit_transform(data[dense_features])
生成特征columns
在Input层之前需要对data特征进行固定格式的组织,即转换为DeepCTR中定义的类型。
在推荐系统中,通常数据的特征有以下几类:
- ID类特征;
- ID序列特征:例如带有先后顺序的行为序列、没有先后顺序的multi-hot编码的ID特征;
- 数值特征:例如年龄、视频时长、文章字数等,数值向量: 例如图片、视频、文本等向量表示。
对于上述三类特征,deepctr分别定义了三种Feature Columns:SparseFeat
、VarLenSparseFeat
、DenseFeat
。
这三个类是标记特征的,在构建模型时,可以通过特征的标记来构建不同的Input层和Embedding层。
class SparseFeat(namedtuple('SparseFeat',
['name', # 特征的名称
'vocabulary_size', # id特征的词典大小
'embedding_dim', # id特征转化成向量的维度
'use_hash',
'vocabulary_path',
'dtype',
'embeddings_initializer', # embedding的初始化方式
'embedding_name',
'group_name', # 该特征所属的组
'trainable'])): # 该id类特征对应的Embedding层是否可训练
class VarLenSparseFeat(namedtuple('VarLenSparseFeat',
['sparsefeat',
'maxlen', # id序列的最大长度
'combiner', # 序列特征聚合的方式(例如mean pooling)
'length_name', # 序列特征对应的有效长度的特征名(也是一个特征)
'weight_name', # 序列特征对应聚合权重的特征名(也是一个特征)
'weight_norm'])):
class DenseFeat(namedtuple('DenseFeat',
['name', # 数值特征的名称
'dimension', # 数值特征的维度,注意区分是单个数值特征还是向量数值特征
'dtype', # 数值类型
'transform_fn'])): # 数值特征的转换方式,例如有些特征需要做log变换
使用这些类,可以对每个特征都设置好对应的标记参数,就可以将所有特征自动化的构建对应的Input层和Embedding层了。
Input层构建
Input层的目的是将特征转换为模型能够处理的且比较合适的特征向量。
对于稀疏特征(sparse feature),通过Embedding将其转换为密集向量,对于密集数值特征(dense numercal feature),将其concat到全连接层的input tensor中。
input_features构建
deepctr使用了build_input_features
方法来构建Input层,参数feature_columns
,即提前根据特征定义好的特征标记数组。
该方法返回一个字典input_features
,key是特征名,value是特征对应的Input层,便于后续使用。
def build_input_features(feature_columns, prefix=''):
input_features = OrderedDict()
for fc in feature_columns:
if isinstance(fc, SparseFeat):
input_features[fc.name] = Input(shape=(1,), name=prefix + fc.name, dtype=fc.dtype)
elif isinstance(fc, DenseFeat):
input_features[fc.name] = Input(shape=(fc.dimension,), name=prefix + fc.name, dtype=fc.dtype)
elif isinstance(fc, VarLenSparseFeat):
input_features[fc.name] = Input(shape=(fc.maxlen,), name=prefix + fc.name, dtype=fc.dtype)
if fc.weight_name is not None:
input_features[fc.weight_name] = Input(shape=(fc.maxlen, 1), name=prefix + fc.weight_name, dtype="float32")
if fc.length_name is not None:
input_features[fc.length_name] = Input((1,), name=prefix + fc.length_name, dtype='int32')
else:
raise TypeError("Invalid feature column type,got", type(fc))
return input_features
Embedding层的构建
对于稀疏向量需要借助Embedding映射到低维密集向量(例如id类型的特征),一般需要两步:
- 根据id类特征的词表大小定义一个Embedding层;
- 根据id类特征的索引去找这个id对应的Embedding向量。
input_from_feature_columns
函数是将feature_columns
再进一步处理,其处理逻辑是:
- 筛选id类/id序列类
sparse_feature_columns
和数值类varlen_sparse_feature_columns
; - 对
feature_columns
创建一个Embedding层的字典,创建逻辑见create_embedding_matrix
方法,该方法生成embedding_matrix_dict
; input_features/embedding_matrix_dict/sparse_feature_columns
=>embedding_lookup
=>group_sparse_embedding_dict
input_features/embedding_matrix_dict/varlen_sparse_feature_columns
=>varlen_embedding_lookup/get_varlen_pooling_list
=>group_varlen_sparse_embedding_dict
group_sparse_embedding_dict/group_varlen_sparse_embedding_dict
=>mergeDict
=>group_embedding_dict
- 最终返回embedding后的
embedding_dict
和dense_value_list
。
在这里,一些参数:
embedding_matrix_dict
:这个是前面根据feature_column
构造的Embedding层,是一个字典,key是特征名,value是特征对应的Embedding层;input_features
:这个是前面根据feature_column
构造的Input层,是一个字典,key是特征名,value是对应的Input层;sparse_feature_columns
:这个是从整个特征标记数组筛选出来的id类特征,因为id类特征和id类序列特征Embedding的处理方式是不一样的,所有这里是将两类特征分开处理的,但是从代码中可以发现,创建Embedding层是放在一起的。
# input_features是一个Input层的字典
def input_from_feature_columns(input_features, feature_columns, l2_reg, seed, prefix='', seq_mask_zero=True,
support_dense=True, support_group=False):
# 筛选出id类和id序列类特征
# sparse_feature_columns表示的是id类和id序列类特征
sparse_feature_columns = list(
filter(lambda x: isinstance(x, SparseFeat), feature_columns)) if feature_columns else []
varlen_sparse_feature_columns = list(
filter(lambda x: isinstance(x, VarLenSparseFeat), feature_columns)) if feature_columns else []
# 返回一个Embedding层的字典,参数依然是feature_columns
embedding_matrix_dict = create_embedding_matrix(feature_columns, l2_reg, seed, prefix=prefix,
seq_mask_zero=seq_mask_zero)
# embedding_lookup函数是用来从Embedding层中获取对应的结果,就是将对应特征的Input层输入到Embedding层中去
# 返回的仍然是一个dict, 此时返回的内容中,embedding层和input层已经连接到一起了,并且还将同一组的id类特征
# 放到了一个列表中,方便同组id类特征的处理
group_sparse_embedding_dict = embedding_lookup(embedding_matrix_dict, input_features, sparse_feature_columns)
# 将id序列的embedding与对应的Input层进行关联
sequence_embed_dict = varlen_embedding_lookup(embedding_matrix_dict, input_features, varlen_sparse_feature_columns)
# 将id序列embedding进行池化操作,返回一个池化后的向量列表
group_varlen_sparse_embedding_dict = get_varlen_pooling_list(sequence_embed_dict, input_features,
varlen_sparse_feature_columns)
# 将字典中的所有层都进行合并
group_embedding_dict = mergeDict(group_sparse_embedding_dict, group_varlen_sparse_embedding_dict)
# 获取dense类特征的所有Input层,在这里可以做一些dense特征的数学变换
# 最终返回的是一个Input层的列表
dense_value_list = get_dense_input(input_features, feature_columns)
if not support_dense and len(dense_value_list) > 0:
raise ValueError("DenseFeat is not supported in dnn_feature_columns")
# 如果当前特征处理过程中没有用到分组的功能的话,就直接将字典中的所有层都转换成一个列表返回
if not support_group:
group_embedding_dict = list(chain.from_iterable(group_embedding_dict.values()))
# group_embedding_dict代表的是一个字典或者一个列表,这取决于当前特征是否支持分组
return group_embedding_dict, dense_value_list
其中,create_embedding_matrix
函数的实现逻辑:
- 首先将id类特征和id序列类特征分成两个列表;
- 然后通过
create_embedding_matrix
方法构造Embedding层:- 遍历所有的id类特征和id序列类特征;
- 根据每个id类特征(即对每个特征都要创建一个Embedding层)的标记定义对应Embeddng层的词典大小,向量维度,初始化方法,正则化参数,Embedding层的名字等;
- 将构造完的Embedding层通过字典的形式返回,key是特征名字,value是特征对应的Embedding层;
- 对于id类特征和id序列类特征,在构造Embedding层的时候,唯一的区别是:id序列类特征创建Embedding的时候多了一个参数
mask_zero=True
。
def create_embedding_matrix(feature_columns, l2_reg, seed, prefix="", seq_mask_zero=True):
from . import feature_column as fc_lib
# 将feature_cloumns中的sparsefeat 和 varlen_sparsefeat 特征分开,方便分开创建Embedding层
sparse_feature_columns = list(
filter(lambda x: isinstance(x, fc_lib.SparseFeat), feature_columns)) if feature_columns else []
varlen_sparse_feature_columns = list(
filter(lambda x: isinstance(x, fc_lib.VarLenSparseFeat), feature_columns)) if feature_columns else []
# 创建Embedding层的字典
sparse_emb_dict = create_embedding_dict(sparse_feature_columns, varlen_sparse_feature_columns, seed,
l2_reg, prefix=prefix + 'sparse', seq_mask_zero=seq_mask_zero)
return sparse_emb_dict
# 其中create_embedding_dict逻辑如下
def create_embedding_dict(sparse_feature_columns, varlen_sparse_feature_columns, seed, l2_reg,
prefix='sparse_', seq_mask_zero=True):
sparse_embedding = {}
for feat in sparse_feature_columns:
emb = Embedding(feat.vocabulary_size, feat.embedding_dim,
embeddings_initializer=feat.embeddings_initializer,
embeddings_regularizer=l2(l2_reg),
name=prefix + '_emb_' + feat.embedding_name)
emb.trainable = feat.trainable
sparse_embedding[feat.embedding_name] = emb
# 对于id序列和id特征的Embedding层只有一个参数是不一样的,那就是mask_zero的设置,如果设置为True,
# 表示的是取出来的是一个embedding序列,如果长度不够最大长度的用0填充
if varlen_sparse_feature_columns and len(varlen_sparse_feature_columns) > 0:
for feat in varlen_sparse_feature_columns:
# if feat.name not in sparse_embedding:
emb = Embedding(feat.vocabulary_size, feat.embedding_dim,
embeddings_initializer=feat.embeddings_initializer,
embeddings_regularizer=l2(
l2_reg),
name=prefix + '_seq_emb_' + feat.name,
mask_zero=seq_mask_zero)
emb.trainable = feat.trainable
sparse_embedding[feat.embedding_name] = emb
return sparse_embedding
通过create_embedding_dict
方法构造Embedding层:
group_sparse_embedding_dict = embedding_lookup(embedding_matrix_dict, input_features, sparse_feature_columns)
def embedding_lookup(sparse_embedding_dict, sparse_input_dict, sparse_feature_columns, return_feat_list=(),
mask_feat_list=(), to_list=False):
# 定义个字典,字典中value默认是一个list,这么做是为了将id类特征进行分组,同一个组的id特征都会在一个列表中,
# 方便后面同组的特征做处理
group_embedding_dict = defaultdict(list)
for fc in sparse_feature_columns:
feature_name = fc.name
embedding_name = fc.embedding_name
if (len(return_feat_list) == 0 or feature_name in return_feat_list):
# 判断当前特征是否需要被哈希,如果需要的话,就将Input层输入到Hash层,得到输出
if fc.use_hash:
lookup_idx = Hash(fc.vocabulary_size, mask_zero=(feature_name in mask_feat_list), vocabulary_path=fc.vocabulary_path)(
sparse_input_dict[feature_name])
else:
# 否则直接取出当前特征的Input层
lookup_idx = sparse_input_dict[feature_name]
group_embedding_dict[fc.group_name].append(sparse_embedding_dict[embedding_name](lookup_idx))
# 如果最后想返回一个列表的话,需要将字典中不同组的embedding都拆开拼接到一个list中
if to_list:
return list(chain.from_iterable(group_embedding_dict.values()))
return group_embedding_dict
对于id序列特征而言,和id类特征是类似的,也是先定义Embedding层然后将对应的Input层输入进来。只不过id序列类特征可能会多一步序列聚合的操作。
从input_from_feature_columns方法中我们看到,除了对id类特征进行了一些处理之外,还有对DenseFeat有简单的处理,这里就简单说一下DenseFeat和SparseFeat在输入到DNN之前需要应该做的事情:
- DenseFeat,只需要将每个特征根据提前设置好的特征标记参数,构建好对应的Input层,然就就可以将这些Input层拼起来输入到DNN层里面去了,所以不需要做太多的其他处理;
- SparseFeat,需要将每个特征根据提前设置好的特征标记参数,构建好对应的Input层以及Embedding层,然后将每个特征的Input层输入到Embedding层,这样输入的一个id索引,就可以得到对应的embedding向量了,得到了id特征的embedding向量,就可以经过DNN层或者各类特征交叉层了;
- VarLenSparseFeat,与SparseFeat类似,但是有一点区别。也是先得到对应的Input、Embedding层,然后将Input层输入到Embedding层,此时得到的就不是一个向量了,而是一个向量序列(维度:batch_size x seq_len x embedding_dim),有时候会将这个向量序列做简单的池化操作,例如mean pooling,得到一个聚合后的向量特征。此外,有时候还可以使用序列模型,例如LSTM将序列输入到LSTM模块中,得到一个新的序列,再进行聚合。
通过上述的介绍,其实特征处理部分的逻辑基本上就做完了,后面就是将上述得到的Input层,或者Embedding层输出的embedding向量,输入到DNN,特征交叉等模块中进一步构造模型。
模型构建
(待补充)
RE:
- https://deepctr-doc.readthedocs.io/en/latest/index.html
- https://zhuanlan.zhihu.com/p/514763115