快捷方式

torch.futures

此套件提供一個 Future 類型,它封裝了非同步執行和一組實用函數,以簡化 Future 物件的操作。目前,Future 類型主要由 分散式 RPC 框架 使用。

class torch.futures.Future(*, devices=None)

這是 torch._C.Future 的包裝器,用於封裝 callable 物件的非同步執行,例如 rpc_async()。它也公開了一組 API 來新增回呼函式和設定結果。

警告

GPU 支援是一個 Beta 功能,可能會有所變動。

add_done_callback(callback)[原始碼][原始碼]

將給定的回呼函式附加到這個 Future,它將在 Future 完成時執行。可以將多個回呼函式新增到同一個 Future,但無法保證它們的執行順序。回呼函式必須採用一個引數,即對這個 Future 的參考。回呼函式可以使用 value() 方法來取得值。請注意,如果這個 Future 已經完成,則給定的回呼函式將會內聯執行。

我們建議您使用 then() 方法,因為它提供了一種在您的回呼完成後進行同步的方法。如果您的回呼不傳回任何內容,add_done_callback 可能更便宜。但 then()add_done_callback 在底層都使用相同的回呼註冊 API。

對於 GPU tensors,此方法的行為方式與 then() 相同。

參數

callback (Future) – 一個 Callable,它接受一個引數,即對這個 Future 的參考。

注意

請注意,如果回呼函式拋出異常,無論是透過原始 future 因例外情況而完成並呼叫 fut.wait(),還是透過回呼中的其他程式碼,都必須仔細處理錯誤。例如,如果這個回呼稍後完成其他 futures,則這些 futures 不會被標記為因錯誤而完成,並且使用者有責任獨立處理這些 futures 的完成/等待。

範例:
>>> def callback(fut):
...     print("This will run after the future has finished.")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
This will run after the future has finished.
5
done()[原始碼][原始碼]

如果這個 Future 已完成,則傳回 True。如果 Future 有結果或例外,則表示已完成。

如果值包含位於 GPU 上的 tensors,即使正在填充這些 tensors 的非同步 kernels 尚未完成在裝置上執行,Future.done() 也會傳回 True,因為在此階段,結果已經可用,前提是執行適當的同步(請參閱 wait())。

傳回類型

bool

set_exception(result)[原始碼][原始碼]

為這個 Future 設定一個例外,這將標記這個 Future 為因錯誤而完成,並觸發所有附加的回呼。請注意,當在這個 Future 上呼叫 wait()/value() 時,此處設定的例外將會內聯引發。

參數

result (BaseException) – 這個 Future 的例外。

範例:
>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[原始碼][原始碼]

為這個 Future 設定結果,這將標記這個 Future 為已完成,並觸發所有附加的回呼。請注意,Future 無法被標記為完成兩次。

如果結果包含位於 GPU 上的 tensors,則即使正在填充這些 tensors 的非同步 kernels 尚未完成在裝置上執行,也可以呼叫此方法,前提是在呼叫此方法時,將那些 kernels 排隊的 streams 設定為目前的 streams。簡單來說,只要在兩者之間沒有變更 streams,就可以在啟動這些 kernels 後立即呼叫此方法,而無需任何額外的同步。此方法將在所有相關的目前 streams 上記錄事件,並將使用它們來確保這個 Future 的所有消費者都能獲得適當的排程。

參數

result (object) – 這個 Future 的結果物件。

範例:
>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
then(callback)[原始碼][原始碼]

將給定的回呼函數附加到此 Future,當 Future 完成時,將會執行此回呼函數。可以將多個回呼函數添加到同一個 Future,但不能保證它們的執行順序 (若要強制執行特定順序,請考慮鏈式調用:fut.then(cb1).then(cb2))。回呼函數必須接受一個參數,即對此 Future 的引用。回呼函數可以使用 value() 方法來取得值。請注意,如果此 Future 已經完成,則給定的回呼函數將會立即內聯執行。

如果 Future 的值包含位於 GPU 上的 tensors,則在填充這些 tensors 的非同步核心程式尚未在裝置上執行完成時,可能會調用回呼函數。但是,將會使用一些專用串流(從全域池中獲取)設定為當前串流來調用回呼函數,這些串流將會與那些核心程式同步。因此,回呼函數對這些 tensors 執行的任何操作都將在核心程式完成後排程在裝置上。換句話說,只要回呼函數不切換串流,它就可以安全地操作結果,而無需任何額外的同步。這類似於 wait() 的非阻塞行為。

同樣地,如果回呼函數傳回的值包含位於 GPU 上的 tensors,即使產生這些 tensors 的核心程式仍在裝置上執行,它也可以這樣做,只要回呼函數在其執行期間沒有變更串流。如果想要變更串流,必須小心地將它們與原始串流重新同步,也就是說,那些在調用回呼函數時是當前串流的串流。

參數

callback (Callable) – 一個 Callable,它接受此 Future 作為唯一的參數。

傳回

一個新的 Future 物件,它保存 callback 的傳回值,並會在給定的 callback 完成時標記為已完成。

傳回類型

Future[S]

注意

請注意,如果回呼函數拋出異常,無論是由於原始 future 以異常完成並調用 fut.wait(),還是由於回呼函數中的其他程式碼,由 then 傳回的 future 都會適當地標記為遇到的錯誤。但是,如果此回呼稍後完成其他 futures,則這些 futures 不會標記為以錯誤完成,使用者有責任獨立處理這些 futures 的完成/等待。

範例:
>>> def callback(fut):
...     print(f"RPC return value is {fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # The inserted callback will print the return value when
>>> # receiving the response from "worker1"
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"Chained cb done. {x.wait()}")
... )
>>> fut.set_result(5)
RPC return value is 5.
Chained cb done. None
value()[原始碼][原始碼]

取得已完成的 future 的值。

只有在呼叫 wait() 完成後,或在傳遞給 then() 的回呼函數內部,才應呼叫此方法。在其他情況下,此 Future 可能尚未保存一個值,並且呼叫 value() 可能會失敗。

如果該值包含位於 GPU 上的 tensors,則此方法將不會執行任何額外的同步。這應該事先單獨透過呼叫 wait() 來完成(除了在回呼中,因為 then() 已經處理好了)。

傳回

Future 保存的值。如果建立值的函數(回呼或 RPC)拋出錯誤,則此 value() 方法也將拋出錯誤。

傳回類型

T

wait()[原始碼][原始碼]

阻塞,直到此 Future 的值準備好。

如果該值包含位於 GPU 上的 tensors,則會與可能非同步填充這些 tensors 的核心程式(在裝置上執行)執行額外的同步。這種同步是非阻塞的,這表示 wait() 將在目前的串流中插入必要的指令,以確保在那些串流上排隊的進一步操作將在非同步核心程式之後正確排程,但是一旦完成,即使那些核心程式仍在執行,wait() 也會傳回。只要不變更串流,訪問和使用這些值時就不需要進一步的同步。

傳回

Future 保存的值。如果建立值的函數(回呼或 RPC)拋出錯誤,則此 wait 方法也將拋出錯誤。

傳回類型

T

torch.futures.collect_all(futures)[原始碼][原始碼]

將提供的 Future 物件收集到一個單一合併的 Future 物件中,該物件在所有子 Future 完成時完成。

參數

futures (list) – Future 物件的列表。

傳回

傳回一個 Future 物件,該物件指向傳入的 Futures 的列表。

傳回類型

Future[List[Future]]

範例:
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[原始碼][原始碼]

等待所有提供的 futures 完成,並傳回已完成值的列表。 如果任何 future 遇到錯誤,該方法將提早退出並報告錯誤,而不會等待其他 futures 完成。

參數

futures (list) – Future 物件的列表。

傳回

已完成的 Future 結果的列表。 如果在任何 Futurewait 拋出錯誤,此方法將拋出錯誤。

傳回類型

List

文件

存取 PyTorch 的完整開發人員文件

檢視文件

教學

取得適用於初學者和進階開發人員的深入教學

檢視教學

資源

尋找開發資源並獲得問題解答

檢視資源