Skip to content

Question on Shared Model & Multiprocesses #4

@kayuksel

Description

@kayuksel

Dear Didier,

You have been super helpful last time. Hence, if you would allow me, I would like to ask one more question. In the following code, I am trying to have multiple processes -distributed across multiple gpu(s)- that has been assigned parts of a data loader to train their local models and then update a single shared model at after they each calculate their losses on their data loaders asynchronously.

The following code works without crashing but model weights are not updated (losses from individual process are different but stay the same in each epoch). Hence, I guess that when I do .backward() from each of those processes, the gradients are not accumulated back to the main model (checkpoint) for which I have initialized the optimizer. I have tried to add the gradients of the local model in each process to the main model (checkpoint) before doing the optimizer step. However, I am receiving the following error in that case: 'AttributeError: 'NoneType' object has no attribute 'add_'

I would welcome if you can try to help me with your expertise on this topic. I wouldn't prefer to disturb you personally if this was a question that I could find answers from the general community at PyTorch forum. Thank you for your time and consideration. Have a great day. Regards, Kamer

Note: This seems easier to handle I guess when CPU or single-CPU is utilized as then the processes could easily update a single model (no cloning or copying parameters to a local model would be necessary). But what I am trying to do rather distribute the processes across 4 gpu(s) that I have.

import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets
from tqdm import tqdm as tq
import os, time, pdb
import _pickle as cPickle
import torch.multiprocessing as _mp
mp = _mp.get_context('spawn')
os.environ["CUDA_VISIBLE_DEVICES"]="0, 1, 2, 3"
with open('y_train.pkl', 'rb') as f: y_train = cPickle.load(f)
with open('y_test.pkl', 'rb') as f: y_test = cPickle.load(f)
y_train = torch.from_numpy(y_train)
y_test = torch.from_numpy(y_test)
with open('train_gaps.pkl', 'rb') as f: X_train = cPickle.load(f)
with open('test_gaps.pkl', 'rb') as f: X_test = cPickle.load(f)
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
train_loader = torch.utils.data.DataLoader(train_dataset, 
    batch_size = 1, shuffle = False, pin_memory = True)
test_loader = torch.utils.data.DataLoader(test_dataset, 
    batch_size = 1, shuffle = False, pin_memory = True)
No_Channels = 12

def calculate_reward(checkpoint, loader, dd, skip):
    model = nn.Linear(512 + No_Channels, No_Channels).cuda(dd).share_memory()
    
    for model_param, check_param in zip(model.parameters(), checkpoint.parameters()):
        model_param.data.copy_(check_param.data)
    #model.load_state_dict(checkpoint.state_dict())

    last_action = torch.ones(No_Channels).cuda(dd)
    last_action[:-1] /= float(No_Channels-1)
    total_reward = 0.0
    pb = tq(loader, position = dd)
    for i, (features, rewards) in enumerate(pb):
        if skip[i]: continue
        features = features.view(-1).cuda(dd, non_blocking=True)
        rewards = rewards.float().cuda(dd, non_blocking=True)
        state = torch.cat([features, last_action])
        action = model(state)
        weights = torch.tanh(action[:-1])
        certain = 0.5 + torch.sigmoid(action[-1]) / 2.0
        weights = weights / weights.abs().sum()
        reward = (weights - last_action[:-1]).abs().sum() * 1e-4
        reward -= (weights * rewards).sum() #- rewards.mean()
        total_reward = total_reward + (reward / certain)
        last_action[:-1] = weights
        last_action[-1] = certain
    #pb.set_postfix({'R': '{:.6f}'.format(total_reward)})
    '''
    total_reward.backward()
    for model_param, check_param in zip(model.parameters(), checkpoint.parameters()):
        check_param.grad.add_(model_param.grad)
    '''
    return total_reward/(i+1)

def train(model, index):
    skip = [(i // (len(train_loader)//4)) != index for i in range(len(train_loader))]
    train_reward = calculate_reward(model, train_loader, (index % 4), skip)
    train_reward.backward()
    print('train %f' % -train_reward.item())
    #print('test %f' % -calculate_reward(model, test_loader).item())

if __name__ == '__main__':
    model = nn.Linear(512 + No_Channels, No_Channels).cuda().share_memory()
    optimizer = optim.Adam(params = model.parameters(), lr = 1e-4)
    torch.backends.cudnn.benchmark = True
    for epoch in range(100):
        model.train(True)
        optimizer.zero_grad()
        processes = []
        for i in range(4):
            p = mp.Process(target=train, args=(model, i))
            p.start()
            processes.append(p)
        for p in processes: p.join()
        optimizer.step()
        model.eval()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions