Help me met het implementeren van gedistribueerde training in PyTorch
description
Maakt efficiënte schaalvergroting van PyTorch-modeltraining over meerdere GPU's of machines mogelijk, waardoor trainingstijd wordt verkort en middelen beter worden benut. Helpt bij het oplossen van uitdagingen rond synchronisatie en parallelisme, met praktische codevoorbeelden en best practices die vaak complex zijn om zelf te implementeren.
prompt
Help me gedistribueerde training te implementeren voor mijn PyTorch-model om het trainen te versnellen over meerdere GPU's of machines. Mijn modelarchitectuur: <voer de architectuur van je PyTorch-model in> Beschrijving ...
try_prompt
generate_helper
disclaimerOnPageApi
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Hier is een overzicht en codevoorbeeld om gedistribueerde training met PyTorch voor jouw ResNet-50-model en ImageNet-dataset te implementeren, gericht op 4 GPU's op één machine.
### 1. Voorbereiding: Imports en initialisatie
Gebruik `torch.distributed` voor communicatie en `torch.nn.parallel.DistributedDataParallel` (DDP) voor data-parallel training.
```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import datasets, transforms, models
import os
```
### 2. Setup en initialisatie van distributed omgeving
Gebruik `torch.multiprocessing` of start via command line. Hier is een voorbeeld met `torch.multiprocessing`.
```python
import torch.multiprocessing as mp
def main(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
# Laad model en verplaats naar GPU
model = models.resnet50(pretrained=False).to(rank)
model = DDP(model, device_ids=[rank])
```
### 3. Dataset en DataLoader met DistributedSampler
Gebruik `DistributedSampler` om data te verdelen over GPU's.
```python
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
train_dataset = datasets.ImageNet('path/to/imagenet', split='train', transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, sampler=train_sampler, num_workers=4, pin_memory=True)
```
### 4. Optimizer en loss functie
Gebruik een optimizer die geschikt is voor grote datasets, zoals SGD met momentum.
```python
optimizer = optim.SGD(model.parameters(), lr=0.1 * world_size, momentum=0.9, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss().to(rank)
```
### 5. Training loop
Synchronisatie wordt automatisch afgehandeld door DDP. Zorg dat je de sampler op elke epoch reset.
```python
for epoch in range(100):
train_sampler.set_epoch(epoch)
model.train()
for inputs, targets in train_loader:
inputs = inputs.to(rank, non_blocking=True)
targets = targets.to(rank, non_blocking=True)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
```
### 6. Best practices en optimalisaties
- **Gradient synchronisatie:** DDP synchroniseert automatisch bij `loss.backward()`.
- **Mixed precision training:** Gebruik `torch.cuda.amp` voor snellere training met minder geheugen.
- **Snelheid verbeteren:**
- Gebruik `torch.backends.cudnn.benchmark = True`.
- Verhoog `num_workers` in DataLoader.
- Gebruik `pin_memory=True`.
- **Checkpoints:** Sla het model op na elke epoch, alleen door de hoofdproces (rank 0).
```python
if rank == 0:
torch.save(model.state_dict(), f'checkpoint_epoch_{epoch}.pth')
```
### 7. Afsluiten
Na training sluit je de process groep af.
```python
dist.destroy_process_group()
```
### 8. Starten van training
Gebruik bijvoorbeeld:
```bash
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=1 --node_rank=0 your_training_script.py
```
Of met `torch.multiprocessing.spawn` in je script:
```python
if __name__ == "__main__":
mp.spawn(main, args=(4,), nprocs=4)
```
---
**Samenvatting:**
- Gebruik `torch.distributed` en `DistributedDataParallel` voor synchronisatie en data-parallelisme.
- Gebruik `DistributedSampler` voor correcte datadeling.
- Optimaliseer met `cudnn.benchmark`, `pin_memory`, en mixed precision.
- Zorg dat alle GPU's hetzelfde model en optimizer gebruiken en dat de data correct verdeeld wordt.
Wil je een volledige, werkende scriptvoorbeelden of meer details over specifieke optimalisatietechnieken?
provider: deepseek
model: deepseek-chat
Zeker, ik help je met gedistribueerde training voor je ResNet-50 model op ImageNet. Hier is een complete implementatie met best practices:
## 1. Basis Distributed Data Parallel (DDP) Implementatie
```python
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
from torch.nn.parallel import DistributedDataParallel as DDP
import os
def setup(rank, world_size):
"""Initialiseer de distributed omgeving"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# Initialiseer het process group
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def cleanup():
"""Cleanup distributed training"""
dist.destroy_process_group()
def create_model():
"""Maak ResNet-50 model aan"""
model = torchvision.models.resnet50(pretrained=False)
return model
def main(rank, world_size):
# Setup distributed training
setup(rank, world_size)
# Model initialiseren
model = create_model().to(rank)
# Distributed Data Parallel wrapper
model = DDP(model, device_ids=[rank])
# Loss function en optimizer
criterion = nn.CrossEntropyLoss().to(rank)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
# Data loading met DistributedSampler
train_dataset = create_imagenet_dataset()
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=world_size, rank=rank
)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=256, # Per GPU batch size
sampler=train_sampler,
num_workers=8, # Aanbevolen: 4-8 workers per GPU
pin_memory=True,
persistent_workers=True
)
# Training loop
for epoch in range(100):
train_sampler.set_epoch(epoch) # Belangrijk voor shuffling
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(rank, non_blocking=True), target.to(rank, non_blocking=True)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and rank == 0:
print(f'Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}]')
cleanup()
def create_imagenet_dataset():
"""ImageNet dataset transformaties"""
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Vervang met je ImageNet pad
dataset = torchvision.datasets.ImageFolder(
root='/path/to/imagenet/train',
transform=transform
)
return dataset
if __name__ == "__main__":
world_size = 4 # Aantal GPUs
mp.spawn(main, args=(world_size,), nprocs=world_size, join=True)
```
## 2. Geavanceerde Optimalisaties
```python
def create_optimized_model(rank):
"""Geoptimaliseerd model met mixed precision en gradient scaling"""
model = create_model().to(rank)
# Mixed precision training
scaler = torch.cuda.amp.GradScaler()
# Gradient accumulation (optioneel voor grotere effective batch size)
gradient_accumulation_steps = 2
return model, scaler, gradient_accumulation_steps
def optimized_training_step(model, data, target, optimizer, scaler, gradient_accumulation_steps):
"""Geoptimaliseerde training step met mixed precision"""
with torch.cuda.amp.autocast():
output = model(data)
loss = criterion(output, target)
# Scale loss en backward pass
scaler.scale(loss / gradient_accumulation_steps).backward()
if (batch_idx + 1) % gradient_accumulation_steps == 0:
# Gradient clipping
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# Optimizer step
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
```
## 3. Best Practices en Optimalisatietips
### Data Loading Optimalisatie:
```python
# Gebruik deze instellingen in je DataLoader
data_loader_config = {
'batch_size': 256, # Per GPU batch size
'num_workers': 8, # 2-4 workers per GPU
'pin_memory': True, # Snellere data transfer naar GPU
'persistent_workers': True, # Vermijdt worker herinitialisatie
'prefetch_factor': 2 # Prefetch batches
}
```
### Learning Rate Scheduling:
```python
def adjust_learning_rate(optimizer, epoch, warmup_epochs=5):
"""Learning rate scheduling met warmup"""
if epoch < warmup_epochs:
# Linear warmup
lr = 0.1 * (epoch + 1) / warmup_epochs
else:
# Cosine annealing
lr = 0.1 * 0.5 * (1 + math.cos(math.pi * (epoch - warmup_epochs) / (100 - warmup_epochs)))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
```
### Synchronisatie en Checkpoints:
```python
def save_checkpoint(model, optimizer, epoch, rank):
"""Save checkpoint alleen van master process"""
if rank == 0:
checkpoint = {
'epoch': epoch,
'model_state_dict': model.module.state_dict(), # .module voor DDP
'optimizer_state_dict': optimizer.state_dict(),
}
torch.save(checkpoint, f'checkpoint_epoch_{epoch}.pth')
def load_checkpoint(model, optimizer, rank):
"""Laad checkpoint op alle processen"""
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
checkpoint = torch.load('checkpoint.pth', map_location=map_location)
model.module.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
return checkpoint['epoch']
```
## 4. Uitvoering en Monitoring
Start script met:
```bash
python -m torch.distributed.launch --nproc_per_node=4 --nnodes=1 --node_rank=0 train.py
```
Of voor nieuwere PyTorch versies:
```bash
torchrun --nproc_per_node=4 --nnodes=1 train.py
```
### Prestatie Monitoring:
```python
# Voeg deze toe aan je training loop
if rank == 0 and batch_idx % 100 == 0:
memory_allocated = torch.cuda.max_memory_allocated(rank) / 1024**3
memory_reserved = torch.cuda.max_memory_reserved(rank) / 1024**3
print(f"GPU Memory - Allocated: {memory_allocated:.2f}GB, Reserved: {memory_reserved:.2f}GB")
```
## Belangrijkste Best Practices:
1. **Gebruik DistributedSampler** voor correcte data verdeling
2. **Set epoch in sampler** voor goede shuffling
3. **Mixed precision** voor betere prestaties en geheugengebruik
4. **Non-blocking data transfer** met `non_blocking=True`
5. **Gradient accumulation** voor grotere effective batch sizes
6. **Learning rate warmup** voor stabiliteit
7. **Gradient clipping** om gradient explosion te voorkomen
8. **Memory monitoring** om bottlenecks te identificeren
Deze implementatie zou een near-linear scaling moeten bereiken over je 4 GPUs met ImageNet dataset.