

DDP(分布式数据并行) 是PyTorch的分布式训练框架,用于在多个GPU/机器上并行训练模型,大幅提升训练速度
它的核心思想是:将大批次数据拆分到多个GPU上,每个GPU计算部分梯度,然后聚合梯度更新模型,确保每个模型的权重都是一样的,然后再进行下一个epoch的训练。
流程如下:
主进程: GPU0 (rank=0) GPU1 (rank=1) GPU2 (rank=2)
│ │ │ │
├─ 初始化 ──→ 相同模型副本 ←───── 相同模型副本 ←───── 相同模型副本
│ │ │ │
├─ 数据分发 ─→ 数据分片1 数据分片2 数据分片3
│ │ │ │
├─ 并行计算 ─→ 前向+反向 前向+反向 前向+反向
│ │ │ │
├─ 梯度同步 ──→ 梯度平均 ←───────── 梯度平均 ←───────── 梯度平均
│ │ │ │
└─ 参数更新 ─→ 更新参数 ←────────── 更新参数 ←────────── 更新参数bash定义一个简单的神经网络:
class Net(nn.Module): # 模型定义
def __init__(self):
super(Net,self).__init__()
self.flatten=nn.Flatten()
self.seq=nn.Sequential(
nn.Linear(28*28,128),
nn.ReLU(),
nn.Linear(128,64),
nn.ReLU(),
nn.Linear(64,10)
)
def forward(self,x):
x=self.flatten(x)
return self.seq(x)bash接下来需要建立多GPU/多机器间的通信网络,让所有训练进程能够互相识别和通信,这是是DDP分布式训练的核心初始化部分。
首先初始化进程组:
dist.init_process_group(backend='nccl')bash启动分布式环境,建立进程间通信。backend=‘nccl’:使用NVIDIA的NCCL通信库(GPU间高速通信),其他可选backend:‘gloo’(CPU)、‘mpi’(高性能计算)。
执行后的效果就是:
启动前:4个独立的Python进程(互不相识)
启动后:4个进程组成通信组,可以互相发送数据bash接下来获取进程排名:
rank = dist.get_rank()bash获取当前进程在组内的唯一标识符。rank=0:主进程(master),通常负责保存模型、日志等
然后获取进程总数:
world_size = dist.get_world_size()bash这个值是在启动python脚本时设置的,例如torchrun --nproc_per_node=4 train.py,则world_size=4
加载checkpoint:
checkpoint=None # 各自加载checkpoint
try:
checkpoint=torch.load('checkpoint.pth',map_location='cpu') # checkpoint是cuda:0保存的,加载默认会读到cuda:0,所以明确指定给cpu
except:
passbash安全地加载之前保存的训练状态,如果文件不存在或损坏,则继续训练。
加载模型及其权重参数到主进程rank0上:
model=Net().to(device_name)
if checkpoint and rank==0: # rank0恢复模型参数
model.load_state_dict(checkpoint['model'])bash然后rank0广播给其他的进程:
model=DDP(model) # 【集合通讯】rank0广播参数给其他进程bash优化器的初始化和状态恢复:
optimizer=torch.optim.Adam(model.parameters(),lr=0.001) #model参数一致,则optim会保证其初始状态一致
if checkpoint:
optimizer.load_state_dict(checkpoint['optimizer']) # 各自加载checkpointbash保证所有GPU上的优化器具有相同的初始状态。
加载训练集并分片发给其他进程:
train_dataset=MNIST(root='./data',download=True,transform=ToTensor(),train=True) # 各自加载dataset
sampler=DistributedSampler(train_dataset) # 指派子集给各进程
train_dataloader=DataLoader(train_dataset,batch_size=32,sampler=sampler,persistent_workers=True,num_workers=2)bash验证集只需要在主进程上进行一轮验证即可,不用分发:
val_dataset=MNIST(root='./data',download=True,transform=ToTensor(),train=False)
val_dataloader=DataLoader(val_dataset,batch_size=32,shuffle=True,persistent_workers=True,num_workers=2)bash接下来进行训练的循环:
for epoch in range(20):
sampler.set_epoch(epoch) # 【集合通讯】生成随机种子,rank0广播给其他进程bashsampler.set_epoch(epoch)能确保各个进程获取到的数据片不一样,如果没有这个代码,会导致模型在每个epoch看到完全相同的数据顺序 → 过拟合风险:
# 结果:
Epoch 0: GPU0处理[0,4,8,...], GPU1处理[1,5,9,...], GPU2处理[2,6,10,...], GPU3处理[3,7,11,...]
Epoch 1: GPU0处理[0,4,8,...], GPU1处理[1,5,9,...], GPU2处理[2,6,10,...], GPU3处理[3,7,11,...] # 相同!
Epoch 2: GPU0处理[0,4,8,...], GPU1处理[1,5,9,...], GPU2处理[2,6,10,...], GPU3处理[3,7,11,...] # 相同!bash接下来就是模型训练部分:
model.train()
for x,y in train_dataloader:
x,y=x.to(device_name),y.to(device_name)
pred_y=model(x) # 【集合通讯】rank0广播model buffer给其他进程
loss=F.cross_entropy(pred_y,y)
optimizer.zero_grad()
loss.backward() # 【集合通讯】每个参数的梯度做all reduce(每个进程会收到其他进程的梯度,并求平均)
optimizer.step()bash其中前向传播时pred_y=model(x),DDP会自动同步模型参数。反向传播时loss.backward(),DDP也会自动同步梯度。
然后执行:
ist.reduce(loss,dst=0) # 【集合通讯】rank0汇总其他进程的lossbash将所有GPU的损失值汇总到主进程(rank0),用于计算平均损失、记录日志等。举个例子:
进程0 (rank=0): loss=0.5
进程1 (rank=1): loss=0.3
进程2 (rank=2): loss=0.7
进程3 (rank=3): loss=0.2
执行 dist.reduce(loss, dst=0) 后:
进程0: loss = 0.5 + 0.3 + 0.7 + 0.2 = 1.7 (汇总结果)
进程1: loss=0.3 (保持不变)
进程2: loss=0.7 (保持不变)
进程3: loss=0.2 (保持不变)bash接下来在主线程rank0上进行验证和保存模型:
if rank==0:
train_avg_loss=loss.item()/world_size
# evaluate
raw_model=model.module
val_loss=0
with torch.no_grad():
for x,y in val_dataloader:
x,y=x.to(device_name),y.to(device_name)
pred_y=raw_model(x)
loss=F.cross_entropy(pred_y,y)
val_loss+=loss.item()
val_avg_loss=val_loss/len(val_dataloader)
print(f'train_loss:{train_avg_loss} val_loss:{val_avg_loss}')
# checkpoint
torch.save({'model':model.module.state_dict(),'optimizer':optimizer.state_dict()},'.checkpoint.pth')
os.replace('.checkpoint.pth','checkpoint.pth')bash特别注意需要执行:
dist.barrier() # 【集合通讯】等待rank0跑完evalbash它是分布式训练中的同步原语,用于确保所有进程在继续执行前达到同一个执行点。作用是创建一个同步屏障,所有进程必须在此处等待,直到所有进程都到达这个点才能继续执行。如果没有这行代码,其他进程会在rank0还在执行验证的时候,执行下一个epoch,导致权重参数不对齐,训练失败!
最后执行命令训练:
torchrun --nproc-per-node 8 singlenode_cpu.pybash这是在cpu上开了8个进程进行训练。打印输出如下:
