From 21ddb34b9367f067b20a11f53aea965bad8704a5 Mon Sep 17 00:00:00 2001 From: preskes Date: Fri, 29 Mar 2024 00:33:15 +0300 Subject: [PATCH] multidomain graph dataset added --- modeling/dataset/base.py | 368 ++++++++++++++++++++++++++++++++++++++ modeling/models/gtorec.py | 4 + 2 files changed, 372 insertions(+) diff --git a/modeling/dataset/base.py b/modeling/dataset/base.py index b7a23614..6e6f3e64 100644 --- a/modeling/dataset/base.py +++ b/modeling/dataset/base.py @@ -540,6 +540,374 @@ def meta(self): return meta +class MultiDomainGraphDataset(GraphDataset, config_name='multi_domain_graph'): + + def __init__( + self, + dataset, + graph_dir_path, + target_domain, + other_domains, + use_train_data_only=True, + use_user_graph=False, + use_item_graph=False, + create_local_graphs=False, + create_global_graphs=True + ): + super().__init__(dataset, graph_dir_path, use_train_data_only, use_user_graph, use_item_graph) + self._target_domain = target_domain + self._other_domains = other_domains + domains = [self._target_domain] + self._other_domains + + self._num_users = {domain:num_users for domain, num_users in dataset[domain].num_users} #TODO + self._num_items = {domain:num_items for domain, num_items in dataset[domain].num_items} #TODO + + train_sampler, validation_sampler, test_sampler = dataset.get_samplers() #TODO ? + + train_interactions, train_user_interactions, train_item_interactions = defaultdict(list), defaultdict(list), defaultdict(list) + + train_user_2_items = defaultdict(defaultdict(set)) + train_item_2_users = defaultdict(defaultdict(set)) + visited_user_item_pairs = defaultdict(set()) + + # CREATE INTERACTIONS FOR ALL DOMAINS + for domain in domains: + t = self.create_event_pairs(train_sampler[domain].dataset, #TODO ? + train_interactions[domain], train_user_interactions[domain], train_item_interactions[domain], + train_user_2_items[domain], train_item_2_users[domain], visited_user_item_pairs[domain] + ) + train_interactions[domain], train_user_interactions[domain], train_item_interactions[domain], train_user_2_items[domain], train_item_2_users[domain], visited_user_item_pairs[domain] = t + + if not self._use_train_data_only: + t = self.create_event_pairs(validation_sampler[domain].dataset, #TODO ? + train_interactions[domain], train_user_interactions[domain], train_item_interactions[domain], + train_user_2_items[domain], train_item_2_users[domain], visited_user_item_pairs[domain] + ) + train_interactions[domain], train_user_interactions[domain], train_item_interactions[domain], train_user_2_items[domain], train_item_2_users[domain], visited_user_item_pairs[domain] = t + + t = self.create_event_pairs(test_sampler[domain].dataset, #TODO ? + train_interactions[domain], train_user_interactions[domain], train_item_interactions[domain], + train_user_2_items[domain], train_item_2_users[domain], visited_user_item_pairs[domain] + ) + train_interactions[domain], train_user_interactions[domain], train_item_interactions[domain], train_user_2_items[domain], train_item_2_users[domain], visited_user_item_pairs[domain] = t + + self._train_interactions = {domain:np.array(values) for domain, values in train_interactions.items()} + self._train_user_interactions = {domain:np.array(values) for domain, values in train_user_interactions.items()} + self._train_item_interactions = {domain:np.array(values) for domain, values in train_item_interactions.items()} + + # CREATE ONE-DOMAIN LOCAL GRAPHS (one domain - one graph) + self._graphs = None + self._user_graphs = None + self._item_graphs = None + + if create_local_graphs: + self._graphs = defaultdict() + self._user_graphs = defaultdict() + self._item_graphs = defaultdict() + + for domain in domains: + path_to_graph = os.path.join(graph_dir_path, 'general_graph_{}.npz'.format(domain)) + if os.path.exists(path_to_graph): + self._graphs[domain] = sp.load_npz(path_to_graph) + else: + # place ones only when co-occurrence happens + user2item_connections = csr_matrix( + (np.ones(len(train_user_interactions[domain])), (train_user_interactions[domain], train_item_interactions[domain])), + shape=(self._num_users[domain] + 2, self._num_items[domain] + 2) + ) # (num_users + 2, num_items + 2), bipartite graph + self._graph = self.get_sparse_graph_layer( + user2item_connections, + self._num_users[domain] + 2, + self._num_items[domain] + 2, + biparite=True + ) + sp.save_npz(path_to_graph, self._graphs[domain]) + + self._graphs[domain] = self._convert_sp_mat_to_sp_tensor(self._graphs[domain]).coalesce().to(DEVICE) + + if self._use_user_graph: + path_to_user_graph = os.path.join(graph_dir_path, 'user_graph_{}.npz'.format(domain)) + if os.path.exists(path_to_user_graph): + self._user_graphs[domain] = sp.load_npz(path_to_user_graph) + else: + user2user_interactions_fst = [] + user2user_interactions_snd = [] + visited_user_item_pairs = set() + visited_user_user_pairs = set() + + for user_id, item_id in tqdm(zip(self._train_user_interactions[domain], self._train_item_interactions[domain])): + if (user_id, item_id) in visited_user_item_pairs: + continue # process (user, item) pair only once + visited_user_item_pairs.add((user_id, item_id)) + + for connected_user_id in train_item_2_users[domain][item_id]: + if (user_id, connected_user_id) in visited_user_user_pairs or user_id == connected_user_id: + continue # add (user, user) to graph connections pair only once + visited_user_user_pairs.add((user_id, connected_user_id)) + + user2user_interactions_fst.append(user_id) + user2user_interactions_snd.append(connected_user_id) + + # (user, user) graph + user2user_connections = csr_matrix( + ( + np.ones(len(user2user_interactions_fst)), (user2user_interactions_fst, user2user_interactions_snd)), + shape=(self._num_users[domain] + 2, self._num_users[domain] + 2) + ) + + self._user_graphs[domain] = self.get_sparse_graph_layer( + user2user_connections, + self._num_users[domain] + 2, + self._num_users[domain] + 2, + biparite=False + ) + sp.save_npz(path_to_user_graph, self._user_graphs[domain]) + + self._user_graphs[domain] = self._convert_sp_mat_to_sp_tensor(self._user_graphs[domain]).coalesce().to(DEVICE) + else: + self._user_graphs[domain] = None + + if self._use_item_graph: + path_to_item_graph = os.path.join(graph_dir_path, 'item_graph_{}.npz'.format(domain)) + if os.path.exists(path_to_item_graph): + self._item_graphs[domain] = sp.load_npz(path_to_item_graph) + else: + item2item_interactions_fst = [] + item2item_interactions_snd = [] + visited_user_item_pairs = set() + visited_item_item_pairs = set() + + for user_id, item_id in tqdm(zip(self._train_user_interactions[domain], self._train_item_interactions[domain])): + if (user_id, item_id) in visited_user_item_pairs: + continue # process (user, item) pair only once + visited_user_item_pairs.add((user_id, item_id)) + + for connected_item_id in train_user_2_items[domain][user_id]: + if (item_id, connected_item_id) in visited_item_item_pairs or item_id == connected_item_id: + continue # add (item, item) to graph connections pair only once + visited_item_item_pairs.add((item_id, connected_item_id)) + + item2item_interactions_fst.append(item_id) + item2item_interactions_snd.append(connected_item_id) + + # (item, item) graph + item2item_connections = csr_matrix( + ( + np.ones(len(item2item_interactions_fst)), (item2item_interactions_fst, item2item_interactions_snd)), + shape=(self._num_items[domain] + 2, self._num_items[domain] + 2) + ) + self._item_graph = self.get_sparse_graph_layer( + item2item_connections, + self._num_items[domain] + 2, + self._num_items[domain] + 2, + biparite=False + ) + sp.save_npz(path_to_item_graph, self._item_graphs[domain]) + + self._item_graphs[domain] = self._convert_sp_mat_to_sp_tensor(self._item_graphs[domain]).coalesce().to(DEVICE) + else: + self._item_graphs[domain] = None + + # MAPPING OF INTERACTIONS + # TODO + # CREATE GLOBAL num_items,... + # num_items = + # num_users = + # train_user_interactions = + # train_item_interactions = + # self._train_user_interactions = + # self._train_item_interactions = + # train_item_2_users = + # train_user_2_items = + def get_user_id_to_index_cross_domain_mapping(self): + _user_id_to_index_cross_domain_mapping = {domain:{} for domain in self._other_domains} + for domain in self._other_domains: + for index, sample in enumerate(self._dataset[domain]): + user_id = sample['user.ids'][0] + _user_id_to_index_cross_domain_mapping[domain][index] = user_id + + return _user_id_to_index_cross_domain_mapping + + num_users_global = sum([self._num_users[domain] for domain in domains]) + num_items_global = sum([self._num_items[domain] for domain in domains]) + for i in range(num_users_global): + for domain in domains: + users_map[i] = domain + users_global = {i:domain for domain in domains} + users_map = {} + + # CREATE GLOBAL GRAPHS (all domains - one graph) + if create_global_graphs: + path_to_graph = os.path.join(graph_dir_path, 'general_graph.npz') # this graph is one for all domains + if os.path.exists(path_to_graph): + self._graph = sp.load_npz(path_to_graph) + else: + # place ones only when co-occurrence happens + domains = [self._target_domain] + self._other_domains + train_user_interactions_len = sum([len(train_user_interactions[domain]) for domain in domains]) + num_users = sum([self._num_users[domain] for domain in domains]) + num_items = sum([self._num_items[domain] for domain in domains]) + + user2item_connections = csr_matrix( + (np.ones(train_user_interactions_len), + (train_user_interactions, train_item_interactions)), #TODO mapper + shape=(num_users + 2, num_items + 2) + ) # (num_users + 2, num_items + 2), bipartite graph + self._graph = self.get_sparse_graph_layer( + user2item_connections, + num_users + 2, + num_items + 2, + biparite=True + ) + sp.save_npz(path_to_graph, self._graph) + + self._graph = self._convert_sp_mat_to_sp_tensor(self._graph).coalesce().to(DEVICE) + + if self._use_user_graph: + path_to_user_graph = os.path.join(graph_dir_path, 'user_graph.npz') # this graph is one for all domains + if os.path.exists(path_to_user_graph): + self._user_graph = sp.load_npz(path_to_user_graph) + else: + domains = [self._target_domain] + self._other_domains + train_user_interactions_len = sum([len(train_user_interactions[domain]) for domain in domains]) + num_users = sum([self._num_users[domain] for domain in domains]) + num_items = sum([self._num_items[domain] for domain in domains]) + + user2user_interactions_fst = [] + user2user_interactions_snd = [] + visited_user_item_pairs = set() + visited_user_user_pairs = set() + + for user_id, item_id in tqdm(zip(self._train_user_interactions, self._train_item_interactions)): #TODO mapper + if (user_id, item_id) in visited_user_item_pairs: + continue # process (user, item) pair only once + visited_user_item_pairs.add((user_id, item_id)) + + for connected_user_id in train_item_2_users[item_id]: #TODO mapper + if (user_id, connected_user_id) in visited_user_user_pairs or user_id == connected_user_id: + continue # add (user, user) to graph connections pair only once + visited_user_user_pairs.add((user_id, connected_user_id)) + + user2user_interactions_fst.append(user_id) + user2user_interactions_snd.append(connected_user_id) + + # (user, user) graph + user2user_connections = csr_matrix( + ( + np.ones(len(user2user_interactions_fst)), (user2user_interactions_fst, user2user_interactions_snd)), + shape=(num_users + 2, num_users + 2) + ) + + self._user_graph = self.get_sparse_graph_layer( + user2user_connections, + num_users + 2, + num_users + 2, + biparite=False + ) + sp.save_npz(path_to_user_graph, self._user_graph) + + self._user_graph = self._convert_sp_mat_to_sp_tensor(self._user_graph).coalesce().to(DEVICE) + else: + self._user_graph = None + + if self._use_item_graph: + path_to_item_graph = os.path.join(graph_dir_path, 'item_graph.npz') # this graph is one for all domains + if os.path.exists(path_to_item_graph): + self._item_graph = sp.load_npz(path_to_item_graph) + else: + domains = [self._target_domain] + self._other_domains + train_user_interactions_len = sum([len(train_user_interactions[domain]) for domain in domains]) + num_users = sum([self._num_users[domain] for domain in domains]) + num_items = sum([self._num_items[domain] for domain in domains]) + + item2item_interactions_fst = [] + item2item_interactions_snd = [] + visited_user_item_pairs = set() + visited_item_item_pairs = set() + + for user_id, item_id in tqdm(zip(self._train_user_interactions, self._train_item_interactions)): #TODO mapper + if (user_id, item_id) in visited_user_item_pairs: + continue # process (user, item) pair only once + visited_user_item_pairs.add((user_id, item_id)) + + for connected_item_id in train_user_2_items[user_id]: #TODO mapper + if (item_id, connected_item_id) in visited_item_item_pairs or item_id == connected_item_id: + continue # add (item, item) to graph connections pair only once + visited_item_item_pairs.add((item_id, connected_item_id)) + + item2item_interactions_fst.append(item_id) + item2item_interactions_snd.append(connected_item_id) + + # (item, item) graph + item2item_connections = csr_matrix( + ( + np.ones(len(item2item_interactions_fst)), (item2item_interactions_fst, item2item_interactions_snd)), + shape=(num_items + 2, num_items + 2) + ) + self._item_graph = self.get_sparse_graph_layer( + item2item_connections, + num_items + 2, + num_items + 2, + biparite=False + ) + sp.save_npz(path_to_item_graph, self._item_graph) + + self._item_graph = self._convert_sp_mat_to_sp_tensor(self._item_graph).coalesce().to(DEVICE) + else: + self._item_graph = None + + @classmethod + def create_from_config(cls, config): + ''' + датасет должен уметь делать граф на основе множества датасетов + поддерживать возможность инициализировать правильно multi-domain семплеры + + из сложного будет только то, как ты будешь объединять айдишники и то, как ты потом их в forward будешь получать, + так как так их как-то надо конкатить аккуратно - это не сильно отличается от конката пользователей и айтемов что я уже делаю + ''' + dataset = BaseDataset.create_from_config(config['dataset']) + return cls( + dataset=dataset, + graph_dir_path=config['graph_dir_path'], + use_user_graph=config.get('use_user_graph', False), + use_item_graph=config.get('use_item_graph', False) + ) + + # TODO create separated function + def create_event_pairs(self, + dataset, #TODO ? + train_interactions, train_user_interactions, train_item_interactions, + train_user_2_items, train_item_2_users, visited_user_item_pairs + ): + for sample in dataset: + user_id = sample['user.ids'][0] + item_ids = sample['item.ids'] + + for item_id in item_ids: + if (user_id, item_id) not in visited_user_item_pairs: + train_interactions.append((user_id, item_id)) + train_user_interactions.append(user_id) + train_item_interactions.append(item_id) + + train_user_2_items[user_id].add(item_id) + train_item_2_users[item_id].add(user_id) + + visited_user_item_pairs.add((user_id, item_id)) + + def create_artifacts_for_graph(self): + pass #TODO + + @property + def meta(self): + meta = { + 'user_graph': self._user_graph, + 'item_graph': self._item_graph, + 'graph': self._graph, + **self._dataset.meta + } + return meta + + class DuorecDataset(BaseDataset, config_name='duorec'): def __init__(self, dataset): diff --git a/modeling/models/gtorec.py b/modeling/models/gtorec.py index dc29435f..9a4be408 100644 --- a/modeling/models/gtorec.py +++ b/modeling/models/gtorec.py @@ -200,6 +200,9 @@ def forward(self, inputs): # source domain item sequence all_sample_events_source = inputs['{}.{}.ids'.format(self._sequence_prefix, self._source_domain)] # (all_batch_events) all_sample_lengths_source = inputs['{}.{}.length'.format(self._sequence_prefix, self._source_domain)] # (batch_size) + # global for global graph + all_sample_events_global = all_sample_events_target + all_sample_events_source + all_sample_lengths_global = all_sample_lengths_target + all_sample_lengths_source # sequential model encoder and target domain items embeddings from sequential model seq_embeddings_target, seq_mask_target = self._apply_sequential_encoder( @@ -207,6 +210,7 @@ def forward(self, inputs): ) # (batch_size, target_seq_len, embedding_dim), (batch_size, target_seq_len) # target domain items encoder for graph model + # Q: здесь нужно изменить all_final_user_embeddings_target, all_final_item_embeddings_target = \ self._apply_graph_encoder(all_sample_events_target, all_sample_lengths_target) # (num_users + 2, embedding_dim), (num_items + 2, embedding_dim) # source domain items encoder for graph model