多進程最佳實踐¶
torch.multiprocessing
是 Python multiprocessing
模組的替代品。 它支援完全相同的操作,但進行了擴展,因此透過 multiprocessing.Queue
傳送的所有張量,其資料將移至共享記憶體,並且只會將控制代碼傳送到另一個程序。
注意
當一個 Tensor
被傳送到另一個進程時,該 Tensor
的資料會被共享。如果 torch.Tensor.grad
不是 None
,它也會被共享。當一個沒有 torch.Tensor.grad
欄位的 Tensor
被傳送到另一個進程後,它會建立一個標準的、特定於該進程的 .grad
Tensor
,與 Tensor
的資料被共享的方式不同,這個新的 .grad
並不會在所有進程中自動共享。
這允許實現各種訓練方法,例如 Hogwild、A3C 或任何其他需要異步操作的方法。
多進程中的 CUDA¶
CUDA 執行時不支援 fork
啟動方法;要使用 CUDA 在子進程中,必須使用 spawn
或 forkserver
啟動方法。
注意
可以使用 multiprocessing.get_context(...)
建立上下文,或直接使用 multiprocessing.set_start_method(...)
來設定啟動方法。
與 CPU tensors 不同,只要接收進程保留 tensor 的副本,發送進程就需要保留原始 tensor。 這是底層的實作方式,但需要使用者遵循最佳實務才能正確執行程式。 例如,只要消費者進程有對 tensor 的參考,發送進程就必須保持活動狀態,而且如果消費者進程因致命訊號異常退出,refcounting 也無法拯救你。 請參閱本節。
另請參閱:使用 nn.parallel.DistributedDataParallel 而不是 multiprocessing 或 nn.DataParallel
最佳實務與提示¶
避免與對抗死鎖¶
在產生新進程時,有很多事情可能會出錯,而死鎖最常見的原因是背景執行緒。 如果有任何執行緒持有鎖定或匯入模組,並且呼叫了 fork
,則子進程很可能處於損毀狀態,並且會死鎖或以不同的方式失敗。 請注意,即使您沒有,Python 內建模組也會 - 只需查看 multiprocessing
即可。multiprocessing.Queue
實際上是一個非常複雜的類別,它會產生多個用於序列化、傳送和接收物件的執行緒,它們也可能導致上述問題。 如果您發現自己處於這種情況,請嘗試使用 SimpleQueue
,它不使用任何其他執行緒。
我們正在盡最大努力讓您輕鬆實現,並確保不會發生這些死鎖,但有些事情超出我們的控制範圍。 如果您有任何無法處理的問題,請嘗試在論壇上尋求協助,我們會看看是否有我們可以解決的問題。
重複使用透過 Queue 傳遞的緩衝區¶
請記住,每次將 Tensor
放入 multiprocessing.Queue
中時,都必須將其移動到共享記憶體中。 如果它已經共享,則不會執行任何操作,否則將會產生額外的記憶體複製,這會降低整個流程的速度。 即使您有一個進程池將資料傳送到單個進程,也請讓它將緩衝區傳送回去 - 這幾乎是免費的,並且可以讓您在傳送下一個批次時避免複製。
非同步多進程訓練(例如 Hogwild)¶
使用 torch.multiprocessing
,可以非同步地訓練模型,參數可以始終共享,也可以定期同步。 在第一種情況下,我們建議傳送整個模型物件,而在後一種情況下,我們建議僅傳送 state_dict()
。
我們建議使用 multiprocessing.Queue
在進程之間傳遞各種 PyTorch 物件。 例如,在使用 fork
啟動方法時,可以繼承已經在共享記憶體中的 tensors 和 storages,但是這非常容易出錯,應謹慎使用,並且僅由高級使用者使用。 佇列,即使它們有時是一個不太優雅的解決方案,但在所有情況下都可以正常工作。
警告
您應該小心使用沒有用 if __name__ == '__main__'
保護的全域語句。 如果使用與 fork
不同的啟動方法,它們將在所有子進程中執行。
Hogwild¶
一個具體的 Hogwild 實作可以在範例儲存庫中找到,但為了展示程式碼的整體結構,下面也有一個最小的範例
import torch.multiprocessing as mp
from model import MyModel
def train(model):
# Construct data_loader, optimizer, etc.
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # This will update the shared parameters
if __name__ == '__main__':
num_processes = 4
model = MyModel()
# NOTE: this is required for the ``fork`` method to work
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
多進程中的 CPU¶
不適當的多進程處理可能導致 CPU 過度分配,導致不同進程競爭 CPU 資源,進而導致效率低下。
本教學將解釋什麼是 CPU 過度分配以及如何避免它。
CPU 過度分配¶
CPU 過度分配是一個技術術語,指的是分配給系統的 vCPU 總數超過硬體上可用的 vCPU 總數的情況。
這會導致 CPU 資源的嚴重爭用。在這種情況下,進程之間會頻繁切換,這會增加進程切換的開銷並降低整體系統效率。
透過在範例儲存庫中找到的 Hogwild 實作中的程式碼範例,了解 CPU 過度分配。
當使用以下命令在 CPU 上使用 4 個進程執行訓練範例時
python main.py --num-processes 4
假設機器上有 N 個 vCPU 可用,執行上述命令將產生 4 個子進程。每個子進程將為自己分配 N 個 vCPU,導致需要 4*N 個 vCPU。但是,機器只有 N 個 vCPU 可用。因此,不同的進程將競爭資源,導致頻繁的進程切換。
以下觀察結果表明存在 CPU 過度分配
高 CPU 使用率:透過使用
htop
命令,您可以觀察到 CPU 使用率持續很高,通常達到或超過其最大容量。這表示對 CPU 資源的需求超過了可用的物理核心,導致進程之間爭用 CPU 時間。頻繁的上下文切換和低系統效率:在過度分配 CPU 的情況下,進程會競爭 CPU 時間,並且作業系統需要快速切換不同的進程以公平地分配資源。這種頻繁的上下文切換會增加開銷並降低整體系統效率。
避免 CPU 過度分配¶
避免 CPU 過度分配的一個好方法是正確的資源分配。確保並行執行的進程或線程數量不超過可用的 CPU 資源。
在這種情況下,一個解決方案是在子進程中指定適當的線程數。這可以透過在子進程中使用 torch.set_num_threads(int)
函數來設定每個進程的線程數來實現。
假設機器上有 N 個 vCPU,並且將產生 M 個進程,則每個進程使用的最大 num_threads
值將為 floor(N/M)
。為了避免 mnist_hogwild 範例中的 CPU 過度分配,需要對範例儲存庫中的 train.py
檔案進行以下變更。
def train(rank, args, model, device, dataset, dataloader_kwargs):
torch.manual_seed(args.seed + rank)
#### define the num threads used in current sub-processes
torch.set_num_threads(floor(N/M))
train_loader = torch.utils.data.DataLoader(dataset, **dataloader_kwargs)
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train_epoch(epoch, args, model, device, train_loader, optimizer)
使用 torch.set_num_threads(floor(N/M))
為每個進程設定 num_thread
。您可以在其中將 N 替換為可用的 vCPU 數量,將 M 替換為選擇的進程數量。適當的 num_thread
值將根據手頭的具體任務而有所不同。但是,作為一般準則,num_thread
的最大值應為 floor(N/M)
,以避免 CPU 過度分配。在mnist_hogwild 訓練範例中,避免 CPU 過度分配後,您可以獲得 30 倍的效能提升。