- Published on
Graph Embedding with Side Information
- Authors

- Name
- Steve Tran
Graph Embedding with Side Information
Introduction
This post is based on the paper "Billion-scale Commodity Embedding for E-commerce Recommendation." In platforms like Taobao, millions of new items are continuously uploaded each hour. These items have no user behavior history, making traditional collaborative filtering methods ineffective.
Learning item representations is crucial for matching and ranking in recommendation systems. Collaborative filtering methods only compute co-occurrence of items in user history behavior, which is challenging when items have few or no interactions.
The authors proposed a new approach: incorporate side information to enhance embedding vectors, dubbed Graph Embedding with Side Information. For example, items with the same brand or category should be closer in the embedding space.
Throughout this post, we'll develop a model that incorporates side information into graph embedding and test it with new items not in the dataset to evaluate performance.
Cold Start Recommendation
Preparation
We'll use the publicly available MovieLens dataset throughout this experiment. At the time of writing, MovieLens-25M is the most up-to-date and contains lots of samples. You can download it from here.
Given this dataset, we have several preprocessing steps:
- The raw
movies.csvcontainsmovie_id, movie title, and genres. We'll convertmovie_idinto ordinal IDs, lowercase and split genres into an array of strings - The raw
tags.csvcontainsuser_id,movie_id, tag, and timestamp. We'll drop null tags, convert to lowercase, and aggregate tags by movie
import pandas as pd
df_entity = pd.read_csv("ml-25m/movies.csv")
df_entity["genres"] = df_entity["genres"].map(lambda d: d.split("|"))
df_entity.head()
df_tag = pd.read_csv("ml-25m/tags.csv").dropna()
df_tag.head()
df_agg_tag = df_tag.drop(["userId","timestamp"],axis=1)\
.assign(tag=lambda df: df["tag"].map(lambda d: d.lower().lstrip().rstrip()))\
.groupby(["movieId"])["tag"].agg("unique").reset_index()
df_agg_tag
df_movie_joint = df_entity.merge(df_agg_tag, on=["movieId"],how="outer")
df_movie_joint
Preprocessing
Ordinal Encoding
We need to create a custom ordinal encoder for our use case. We transform lists of category/ID features into ordinal IDs, replacing empty values appropriately.
import numpy as np
from collections import defaultdict
class OrdinalEncoder():
"""
Convert categorical into ordinal integer ids
If value is not existed in vocab, it will be replaced by <unk> val
"""
def __init__(self,start_from=1, unknown=0):
self.vocabs = {}
self.wc = defaultdict(int)
self.inv = []
self.start_from = start_from
self.unknown = unknown
def __len__(self, ):
return len(self.inv)
def fit(self, X):
for i in range(len(X)):
self.wc[X[i]] += 1
X_uniq = np.unique(X)
self.inv = [0] * (len(X_uniq) + self.start_from)
for idx, item in enumerate(X_uniq):
self.vocabs[item] = self.start_from + idx
self.inv[self.start_from + idx] = item
self.inv = np.array(self.inv)
return self
def transform(self, X):
if isinstance(X[0],(list,np.ndarray)):
res = []
for idx in range(len(X)):
tmp = [self.vocabs.get(item) for item in X[idx] if item in self.vocabs]
res.append(tmp)
return res
else:
return np.array([ self.vocabs.get(item, self.unknown) for item in X ],dtype="int")
def fit_transform(self, X):
return self.fit(X).transform(X)
def inverse_transform(self, X):
if len(X) == 0:
return []
X = np.array(X, dtype="int")
return self.inv[X]
@property
def n_classes_(self):
return len(self.inv)
def word_count_table(self):
return [self.wc[w] for w in self.inv]
class MultiLabelEncoder(OrdinalEncoder):
"""
Convert multi-labels into ordinal ids
"""
def fit(self, X: list):
X_extend = []
for i in range(len(X)):
if isinstance(X[i],(list,np.ndarray)):
X_extend.extend(X[i])
super().fit(X_extend)
return self
metadata_cols = ["genres","tag"]
movie_encoder = OrdinalEncoder(start_from=1).fit(df_movie_joint["movieId"].tolist())
encoder_mapper = {col: MultiLabelEncoder(start_from=1).fit(df_movie_joint[col].tolist()) for col in metadata_cols}
df_movie_joint_encoder = df_movie_joint[["movieId"]].copy(deep=True)
df_movie_joint_encoder["movieId"] = movie_encoder.transform(df_movie_joint["movieId"].tolist())
for col in metadata_cols:
df_movie_joint_encoder[col] = encoder_mapper[col].transform(df_movie_joint[col].fillna("").tolist())
df_movie_joint_encoder["movieId"] = df_movie_joint_encoder["movieId"].astype("int")
movie_metadata_info = df_movie_joint_encoder.set_index(["movieId"])[metadata_cols].to_dict(orient="index")
df_movie_joint_encoder["title"] = df_movie_joint["title"]
Random Walk Methods
Given ratings.csv, we'll sort user ratings by timestamp to create time-ordered sequences. Note that user rating behaviors are used to mimic user watch behaviors.
We convert ratings into sequences using the random walk technique, leveraging source code from Bryan Perozzi.
import logging
import random
from collections import defaultdict
from time import time
logger = logging.getLogger("deepwalk")
class Graph(defaultdict):
"""Efficient basic implementation of nx Graph - Undirected graphs with self loops"""
def __init__(self):
super(Graph, self).__init__(list)
def nodes(self):
return self.keys()
def random_walk(self, path_length, alpha=0, rand=random.Random(), start=None):
"""Returns a truncated random walk.
path_length: Length of the random walk.
alpha: probability of restarts.
start: the start node of the random walk.
"""
G = self
if start:
path = [start]
else:
# Sampling is uniform w.r.t V, and not w.r.t E
path = [rand.choice(list(G.keys()))]
while len(path) < path_length:
cur = path[-1]
if len(G[cur]) > 0:
if rand.random() >= alpha:
path.append(rand.choice(G[cur]))
else:
path.append(path[0])
else:
break
return [str(node) for node in path]
def make_undirected(self):
t0 = time()
for v in list(self):
for other in self[v]:
if v != other:
self[other].append(v)
t1 = time()
logger.info('make_directed: added missing edges {}s'.format(t1-t0))
self.make_consistent()
return self
def make_consistent(self):
t0 = time()
for k in iterkeys(self):
self[k] = list(sorted(set(self[k])))
t1 = time()
logger.info('make_consistent: made consistent in {}s'.format(t1-t0))
self.remove_self_loops()
return self
def remove_self_loops(self):
removed = 0
t0 = time()
for x in self:
if x in self[x]:
self[x].remove(x)
removed += 1
t1 = time()
logger.info('remove_self_loops: removed {} loops in {}s'.format(removed, (t1-t0)))
return self
def build_deepwalk_corpus_iter(G, num_paths, path_length, alpha=0,
rand=random.Random(0)):
nodes = list(G.nodes())
for cnt in range(num_paths):
rand.shuffle(nodes)
for node in nodes:
yield G.random_walk(path_length, rand=rand, alpha=alpha, start=node)
def load_adjacencylist(file_, undirected=False, chunksize=10000):
def parse_adjacencylist_unchecked(f):
adjlist = []
for l in f:
if l and l[0] != "#":
adjlist.extend([[str(x) for x in l.strip().split()]])
return adjlist
def from_adjlist_unchecked(adjlist):
G = Graph()
for row in adjlist:
node = row[0]
neighbors = row[1:]
G[node] = neighbors
return G
parse_func = parse_adjacencylist_unchecked
convert_func = from_adjlist_unchecked
adjlist = []
t0 = time()
total = 0
with open(file_) as f:
for idx, adj_chunk in enumerate(map(parse_func, grouper(int(chunksize), f))):
adjlist.extend(adj_chunk)
total += len(adj_chunk)
t1 = time()
logger.info('Parsed {} edges with {} chunks in {}s'.format(total, idx, t1-t0))
t0 = time()
G = convert_func(adjlist)
t1 = time()
logger.info('Converted edges to graph in {}s'.format(t1-t0))
if undirected:
t0 = time()
G = G.make_undirected()
t1 = time()
logger.info('Made graph undirected in {}s'.format(t1-t0))
return G
def grouper(n, iterable, padvalue=None):
"grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')"
from six.moves import zip_longest
return zip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
Generate Sequences
df_rating = pd.read_csv("ml-25m/ratings.csv")
df_rating["movieId"] = movie_encoder.transform(df_rating["movieId"].tolist())
df_rating_path = df_rating.groupby(["userId"])["movieId"].agg(list).reset_index()
del df_rating
random_seed = 4111
number_walks = 10
walk_length = 20
restart_prob = 0.
min_len = 3
with open("paths_filtered.txt","w") as f:
for _, row in df_rating_path.iterrows():
all_paths = row["movieId"]
# Skip short sequences
if len(all_paths) < min_len:
continue
f.write(" ".join([str(node) for node in all_paths])+"\n")
G = load_adjacencylist("paths_filtered.txt",undirected=True)
cursor = build_deepwalk_corpus_iter(G,
num_paths=number_walks,
path_length=walk_length,
alpha=restart_prob,
rand=random.Random(random_seed))
with open("walks.txt","w") as f:
for path in cursor:
f.write(" ".join(path) + "\n")
Deep Dive into Model
Generate Skip-Gram and Negative Sampling
feature_schema = {
"genres": {
"type": "categorical_list",
"size": encoder_mapper["genres"].n_classes_,
},
"tag": {
"type": "categorical_list",
"size": encoder_mapper["tag"].n_classes_,
},
}
import torch
from torch.utils.data import Dataset
from tqdm import tqdm
def generate_skipgram(sentence, i, window_size, unk: int = 0):
iword = sentence[i]
left = sentence[max(i - window_size, 0): i]
right = sentence[i + 1: i + 1 + window_size]
return iword, [unk for _ in range(window_size - len(left))] + left + right + [unk for _ in range(window_size - len(right))]
class MovieLenGraphPathDataset(Dataset):
"""MovieLen Torch Dataset"""
def __init__(
self,
file_path,
side_info_lookup: dict=None,
feature_schema: dict=None,
window_size: int=5,
subsample_rate: float = 0) -> None:
self.data = []
self.wc = defaultdict(int)
self.window_size = window_size
self.side_info_lookup = side_info_lookup
self.feature_schema = feature_schema
cols = list(feature_schema.keys())
with open(file_path) as f:
step = 0
for line in f:
if step % 1000 == 0:
print("working on line: {}".format(step), end="\r")
line = [int(w.strip()) for w in line.strip().split() if len(w.strip()) > 0]
for i in range(len(line)):
self.wc[line[i]] += 1
center_word, neighbor_words = generate_skipgram(line, i, window_size)
if not center_word in self.side_info_lookup:
continue
for neighbor_word in neighbor_words:
if neighbor_word == 0:
continue
tmp = {"center_word": center_word, "neighbor_word": neighbor_word, "side_information": {}}
for col in cols:
if col in self.side_info_lookup[center_word]:
tmp["side_information"][col] = self.side_info_lookup[center_word][col]
self.data.append(tmp)
step += 1
self.wc_arr = np.zeros(len(side_info_lookup) + 1, dtype=int)
for k, v in self.wc.items():
self.wc_arr[k] = v
if subsample_rate > 0:
wf = self.wc_arr / self.wc_arr.sum()
ws = 1 - np.sqrt(subsample_rate/wf)
ws = np.clip(ws, 0, 1)
data = []
for i in range(len(self.data)):
if random.random() > ws[self.data[i][0]]:
data.append(self.data[i])
self.data = data
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)
def get_word_count(self):
return self.wc_arr
movie_ds = MovieLenGraphPathDataset("walks.txt",
side_info_lookup=movie_metadata_info,
feature_schema=feature_schema,
window_size=5)
Wrap up with Torch DataLoader:
def padding_tensor(arr, maxlen, dtype):
padded_sess = torch.zeros(len(arr), maxlen, dtype=dtype)
for i in range(len(arr)):
padded_sess[i, :len(arr[i])] = arr[i]
return padded_sess
def get_dataloader(dataset, feature_mapper, batch_size=64, shuffle=True, num_workers=0):
from torch.utils.data import DataLoader
def collate_fn(inputs):
outputs = {k: [] for k in feature_mapper.keys()}
outputs["center_word"] = []
outputs["neighbor_word"] = []
max_len_mapper = defaultdict(int)
for i in range(len(inputs)):
outputs["center_word"].append(torch.tensor(inputs[i]["center_word"], dtype=torch.int))
outputs["neighbor_word"].append(torch.tensor(inputs[i]["neighbor_word"], dtype=torch.int))
for k in feature_mapper.keys():
outputs[k].append(torch.tensor(inputs[i]["side_information"][k], dtype=torch.int))
max_len_mapper[k] = max(len(inputs[i]["side_information"][k]), max_len_mapper[k])
outputs["center_word"] = torch.tensor(outputs["center_word"], dtype=torch.int)
outputs["neighbor_word"] = torch.tensor(outputs["neighbor_word"], dtype=torch.int)
for k in feature_mapper.keys():
if feature_mapper[k]["type"] == "categorical":
outputs[k] = torch.tensor(outputs[k], dtype=torch.int)
elif feature_mapper[k]["type"] == "categorical_list":
outputs[k] = padding_tensor(outputs[k], max_len_mapper[k], dtype=torch.int)
return outputs
return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=collate_fn, num_workers=num_workers)
Weighted Skip Gram
Let's define the problem. We use to define the embedding matrix of items or Side Information (SI). Specifically, denotes the embedding of item , and denotes the s-th type of embedding of the s-th type SI attached to item .
For item with SIs, we have vectors where is the embedding dimension. We proposed a weighted layer to aggregate embeddings of SI related to items. Given is the weight of the s-type of SI of side information of item with denoting the weight of item itself. The formula is:
where we calculate to ensure the contribution of each SI is positive, and is the normalization of weights of each SI embedding.
For node and its context nodes in the training data, we represent to represent its embedding and is the label. The objective function is:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from typing import List, Union
def fixed_unigram_candidate_sampler(
true_classes: Union[np.array, torch.Tensor],
num_true: int,
num_samples: int,
range_max: int,
unigrams: List[Union[int, float]],
unique: bool = False,
distortion: float = 1.):
"""
Generate candidates based on positive examples.
"""
if isinstance(true_classes, torch.Tensor):
true_classes = true_classes.detach().cpu().numpy()
unigrams = np.array(unigrams)
if distortion != 1.:
unigrams = unigrams.astype(np.float64) ** distortion
result = []
has_seen = set()
for i in range(len(true_classes)):
for j in range(len(true_classes[i])):
has_seen.add(true_classes[i][j])
if range_max < num_samples and unique:
raise Exception("Range max is lower than num samples")
while len(result) < num_samples:
sampler = torch.utils.data.WeightedRandomSampler(unigrams, num_samples)
candidates = np.array(list(sampler))
for item in candidates:
if unique:
if item not in has_seen:
result.append(item)
has_seen.add(item)
else:
result.append(item)
return result
class SkigGram(pl.LightningModule):
def __init__(
self,
embedding_size: int,
embedding_dim: int,
side_information_schema: dict,
word_count_table: np.ndarray,
lr: float = 0.001,
negative: int = 5,
):
"""
Args:
embedding_size: An int, unique word number
embedding_dim: An int, embedding dimension
side_information_schema: A dict, side information schema contains name, type of side information
word_count_table: An array, define word count of item in order to generate negative sampling
"""
super(SkigGram, self).__init__()
self.side_information_schema = side_information_schema
self.embedding_size = embedding_size
self.embedding_dim = embedding_dim
self.negative = negative
self.wc = word_count_table
self.lr = lr
# center word embedding
self.center_word_embed = nn.Embedding(embedding_size, embedding_dim, padding_idx=0)
# neighbor word embedding
self.neighbor_word_embed = nn.Embedding(embedding_size, embedding_dim, padding_idx=0)
self.si_dict = nn.ModuleDict()
self.si_keys = []
for key, info in side_information_schema.items():
self.si_keys.append(key)
if info["type"] == "categorical":
self.si_dict[key] = nn.Embedding(info["size"], embedding_dim, padding_idx=0)
elif info["type"] == "categorical_list":
self.si_dict[key] = nn.EmbeddingBag(info["size"], embedding_dim, padding_idx=0)
self._weight_init()
def _weight_init(self):
with torch.no_grad():
self.center_word_embed.weight.data.normal_(0., 0.01)
self.neighbor_word_embed.weight.data.normal_(0., 0.01)
for key in self.si_dict.keys():
self.si_dict[key].weight.data.normal_(0., 0.01)
# init side information weight
self.register_buffer("embedding_weight", torch.rand((len(self.si_keys) + 1, 1), requires_grad=True))
def compute_vector(self, data: dict):
embed_center_word = self.center_word_embed(data["center_word"].to(self.device))
information_list = [embed_center_word]
# side information
for k in self.si_keys:
information_list.append(self.si_dict[k](data[k].to(self.device)))
# word and side information embedding list
information_embed = torch.cat(information_list, dim=0).view(len(information_list), -1, self.embedding_dim)
exp_embedding_weights = torch.exp(self.embedding_weight.view(-1, 1, 1))
weight_sum_pooling = information_embed * exp_embedding_weights / torch.sum(exp_embedding_weights)
embed_center_word_side_information = torch.sum(weight_sum_pooling, dim=0)
return embed_center_word_side_information
def forward(self, data: dict):
embed_center_word = self.center_word_embed(data["center_word"].to(self.device))
information_list = [embed_center_word]
# neighbor word
embed_neighbor_word = self.neighbor_word_embed(data["neighbor_word"].to(self.device))
# neg word
neg_word = torch.tensor(fixed_unigram_candidate_sampler(
data["center_word"].unsqueeze(1),
num_true=1,
num_samples=len(data["center_word"]) * self.negative,
range_max=len(self.wc),
unigrams=self.wc,
), dtype=torch.int).reshape((-1, self.negative))
embed_neg_word = self.neighbor_word_embed(neg_word.to(self.device))
# side information
for k in self.si_keys:
information_list.append(self.si_dict[k](data[k].to(self.device)))
# word and side information embedding list
information_embed = torch.cat(information_list, dim=0).view(len(information_list), -1, self.embedding_dim)
exp_embedding_weights = torch.exp(self.embedding_weight.view(-1, 1, 1))
weight_sum_pooling = information_embed * exp_embedding_weights / torch.sum(exp_embedding_weights)
embed_center_word_side_information = torch.sum(weight_sum_pooling, dim=0)
score = torch.sum(torch.mul(embed_center_word_side_information, embed_neighbor_word.squeeze()), dim=1)
score = torch.clamp(score, max=10, min=-10)
score = -F.logsigmoid(score)
neg_score = torch.bmm(embed_neg_word, embed_center_word_side_information.unsqueeze(2)).squeeze()
neg_score = torch.clamp(neg_score, max=10, min=-10)
neg_score = -torch.sum(F.logsigmoid(-neg_score), dim=1)
return torch.mean(score + neg_score)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
return optimizer
def training_step(self, train_batch, batch_idx):
x = train_batch
loss = model(x).mean()
self.log('train_loss', loss)
return loss
Algorithm Training
We use PyTorch Lightning Trainer to speed up multi-GPU and multi-processing Data Loader:
import torch
from pytorch_lightning import loggers as pl_loggers
num_gpus = torch.cuda.device_count()
lr = 0.001
epochs = 5
negative_items = 5
batch_size = 256 * num_gpus
num_items = len(movie_metadata_info) + 3
embedding_dim = 50
data_loader = get_dataloader(
movie_ds,
feature_schema,
batch_size=batch_size,
shuffle=True,
num_workers=num_gpus)
model = SkigGram(
num_items,
embedding_dim,
feature_schema,
movie_ds.get_word_count(),
lr=lr,
negative=negative_items)
tb_logger = pl_loggers.TensorBoardLogger("logs/")
trainer = pl.Trainer(
gpus=num_gpus,
strategy="dp",
max_epochs=epochs,
accelerator="gpu",
log_every_n_steps=1000,
flush_logs_every_n_steps=1000,
logger=tb_logger)
trainer.fit(model, data_loader)
Testing Model
First, let's extract our movie embeddings:
from gensim.models import KeyedVectors
def convert_to_tensor(entity_id, data):
tmp = {"center_word": torch.tensor([entity_id], dtype=torch.int)}
for k, v in data.items():
if len(v) == 0:
v = [0]
tmp[k] = torch.tensor([v], dtype=torch.int)
return tmp
X = np.zeros((len(movie_metadata_info) + 3, embedding_dim))
for entity_id, entity_info in tqdm(movie_metadata_info.items()):
X[entity_id] = model.compute_vector(convert_to_tensor(entity_id, entity_info)).detach().cpu().numpy()
kv_metadata = KeyedVectors(vector_size=embedding_dim, count=len(X))
for i in range(1, len(X) - 1):
kv_metadata.add_vector(i, X[i])
Now, let's imagine we just uploaded "Spider-Man: Far From Home" (a new movie from December 2021) into our system. The movie is new, so we want to find similar movies related to it. Since there's no interaction history, our team will assign genres and tags for this movie.
import json
test_movie_info = {
"genres": ["action", "adventure", "science fiction"],
"tag": tags # extracted from user tags
}
test_movie_tags = {k: encoder_mapper[k].transform(v) for k, v in test_movie_info.items()}
X_query = model.compute_vector(convert_to_tensor(0, test_movie_tags)).detach().cpu().numpy()
list_candidates_ids = {movie_id: score for movie_id, score in kv_metadata.similar_by_vector(X_query[0])}
df_movie_joint_encoder[df_movie_joint_encoder["movieId"].isin(list_candidates_ids)]\
.assign(score=lambda x: x["movieId"].map(lambda d: list_candidates_ids[d]))\
.sort_values(by=["score"], ascending=False)[["title", "score"]]
The model learns quite well about similar movies related to Spider-Man: Far From Home, even though this movie wasn't in the training data. This demonstrates the effectiveness of incorporating side information for cold start recommendations.
End Note
We have implemented Graph Embedding with Side Information to incorporate item side information. We introduced how to construct an item graph from user behavior history and learn the embeddings of all items in the graph. The item embeddings are employed to compute pairwise similarities between all items, which are then used in the recommendation process. To alleviate sparsity and cold start problems, side information is incorporated into the graph embedding framework.
In terms of data, we only added two metadata features: genres and user tags. In the future, we could add new features such as actors, actresses, directors, and more.
The model in this post is based on negative sampling. We could consider integrating more advanced graph embedding techniques like Graph Convolutional Networks or Knowledge Graphs.