Common Patterns
Pattern 1: Generic Join Context Manager# Created On: Jun 06, 2025 | Last Updated On: Jun 06, 2025 The generic join context manager facilitates distributed training on uneven inputs. This page outlines the API of the relevant classes: Join, Joinable, and JoinHook. For a tutorial, see Distributed Training with Uneven Inputs Using the Join Context Manager. class torch.distributed.algorithms.Join(joinables, enable=True, throw_on_early_termination=False, kwargs)[source]# This class defines the generic join context manager, which allows custom hooks to be called after a process joins. These hooks should shadow the collective communications of non-joined processes to prevent hanging and erroring and to ensure algorithmic correctness. Refer to JoinHook for details about the hook definition. Warning The context manager requires each participating Joinable to call the method notify_join_context() before its own per- iteration collective communications to ensure correctness. Warning The context manager requires that all process_group attributes in the JoinHook objects are the same. If there are multiple JoinHook objects, then the device of the first is used. The process group and device information is used for checking for non- joined processes and for notifying processes to throw an exception if throw_on_early_termination is enabled, both of which using an all- reduce. Parameters joinables (List[Joinable]) β a list of the participating Joinable s; their hooks are iterated over in the given order. enable (bool) β a flag enabling uneven input detection; setting to False disables the context managerβs functionality and should only be set when the user knows the inputs will not be uneven (default: True). throw_on_early_termination (bool) β a flag controlling whether to throw an exception upon detecting uneven inputs (default: False). Example: >>> import os >>> import torch >>> import torch.distributed as dist >>> import torch.multiprocessing as mp >>> import torch.nn.parallel.DistributedDataParallel as DDP >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO >>> from torch.distributed.algorithms.join import Join >>> >>> # On each spawned worker >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank]) >>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01) >>> # Rank 1 gets one more input than rank 0 >>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)] >>> with Join([model, optim]): >>> for input in inputs: >>> loss = model(input).sum() >>> loss.backward() >>> optim.step() >>> # All ranks reach here without hanging/erroring static notify_join_context(joinable)[source]# Notifies the join context manager that the calling process has not yet joined. Then, if throw_on_early_termination=True, checks if uneven inputs have been detected (i.e. if one process has already joined) and throws an exception if so. This method should be called from a Joinable object before its per-iteration collective communications. For example, this should be called at the beginning of the forward pass in DistributedDataParallel. Only the first Joinable object passed into the context manager performs the collective communications in this method, and for the others, this method is vacuous. Parameters joinable (Joinable) β the Joinable object calling this method. Returns An async work handle for the all-reduce meant to notify the context manager that the process has not yet joined if joinable is the first one passed into the context manager; None otherwise. class torch.distributed.algorithms.Joinable[source]# This defines an abstract base class for joinable classes. A joinable class (inheriting from Joinable) should implement join_hook(), which returns a JoinHook instance, in addition to join_device() and join_process_group() that return device and process group information, respectively. abstract property join_device: device# Return the device from which to perform collective communications needed by the join context manager. abstract join_hook(kwargs)[source]# Return a JoinHook instance for the given Joinable. Parameters kwargs (dict) β a dict containing any keyword arguments to modify the behavior of the join hook at run time; all Joinable instances sharing the same join context manager are forwarded the same value for kwargs. Return type JoinHook abstract property join_process_group: Any# Returns the process group for the collective communications needed by the join context manager itself. class torch.distributed.algorithms.JoinHook[source]# This defines a join hook, which provides two entry points in the join context manager. Entry points : a main hook, which is called repeatedly while there exists a non-joined process, and a post-hook, which is called once all processes have joined. To implement a join hook for the generic join context manager, define a class that inherits from JoinHook and override main_hook() and post_hook() as appropriate. main_hook()[source]# Call this hook while there exists a non-joined process to shadow collective communications in a training iteration. Training iteration i.e., in one forward pass, backward pass, and optimizer step. post_hook(is_last_joiner)[source]# Call hook after all processes have joined. It is passed an additional bool argument is_last_joiner, which indicates if the rank is one of the last to join. Parameters is_last_joiner (bool) β True if the rank is one of the last to join; False otherwise.
```
Join
```
Pattern 2: Distributed communication package - torch.distributed# Created On: Jul 12, 2017 | Last Updated On: Sep 04, 2025 Note Please refer to PyTorch Distributed Overview for a brief introduction to all features related to distributed training. Backends# torch.distributed supports four built-in backends, each with different capabilities. The table below shows which functions are available for use with a CPU or GPU for each backend. For NCCL, GPU refers to CUDA GPU while for XCCL to XPU GPU. MPI supports CUDA only if the implementation used to build PyTorch supports it. Backend gloo mpi nccl xccl Device CPU GPU CPU GPU CPU GPU CPU GPU send β β β ? β β β β recv β β β ? β β β β broadcast β β β ? β β β β all_reduce β β β ? β β β β reduce β β β ? β β β β all_gather β β β ? β β β β gather β β β ? β β β β scatter β β β ? β β β β reduce_scatter β β β β β β β β all_to_all β β β ? β β β β barrier β β β ? β β β β Backends that come with PyTorch# PyTorch distributed package supports Linux (stable), MacOS (stable), and Windows (prototype). By default for Linux, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.) Note As of PyTorch v1.8, Windows supports all collective communications backend but NCCL, If the init_method argument of init_process_group() points to a file it must adhere to the following schema: Local file system, init_method="file:///d:/tmp/some_file" Shared file system, init_method="file://////{machine_name}/{share_folder_name}/some_file" Same as on Linux platform, you can enable TcpStore by setting environment variables, MASTER_ADDR and MASTER_PORT. Which backend to use?# In the past, we were often asked: βwhich backend should I use?β. Rule of thumb Use the NCCL backend for distributed training with CUDA GPU. Use the XCCL backend for distributed training with XPU GPU. Use the Gloo backend for distributed training with CPU. GPU hosts with InfiniBand interconnect Use NCCL, since itβs the only backend that currently supports InfiniBand and GPUDirect. GPU hosts with Ethernet interconnect Use NCCL, since it currently provides the best distributed GPU training performance, especially for multiprocess single-node or multi-node distributed training. If you encounter any problem with NCCL, use Gloo as the fallback option. (Note that Gloo currently runs slower than NCCL for GPUs.) CPU hosts with InfiniBand interconnect If your InfiniBand has enabled IP over IB, use Gloo, otherwise, use MPI instead. We are planning on adding InfiniBand support for Gloo in the upcoming releases. CPU hosts with Ethernet interconnect Use Gloo, unless you have specific reasons to use MPI. Common environment variables# Choosing the network interface to use# By default, both the NCCL and Gloo backends will try to find the right network interface to use. If the automatically detected interface is not correct, you can override it using the following environment variables (applicable to the respective backend): NCCL_SOCKET_IFNAME, for example export NCCL_SOCKET_IFNAME=eth0 GLOO_SOCKET_IFNAME, for example export GLOO_SOCKET_IFNAME=eth0 If youβre using the Gloo backend, you can specify multiple interfaces by separating them by a comma, like this: export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3. The backend will dispatch operations in a round-robin fashion across these interfaces. It is imperative that all processes specify the same number of interfaces in this variable. Other NCCL environment variables# Debugging - in case of NCCL failure, you can set NCCL_DEBUG=INFO to print an explicit warning message as well as basic NCCL initialization information. You may also use NCCL_DEBUG_SUBSYS to get more details about a specific aspect of NCCL. For example, NCCL_DEBUG_SUBSYS=COLL would print logs of collective calls, which may be helpful when debugging hangs, especially those caused by collective type or message size mismatch. In case of topology detection failure, it would be helpful to set NCCL_DEBUG_SUBSYS=GRAPH to inspect the detailed detection result and save as reference if further help from NCCL team is needed. Performance tuning - NCCL performs automatic tuning based on its topology detection to save usersβ tuning effort. On some socket-based systems, users may still try tuning NCCL_SOCKET_NTHREADS and NCCL_NSOCKS_PERTHREAD to increase socket network bandwidth. These two environment variables have been pre-tuned by NCCL for some cloud providers, such as AWS or GCP. For a full list of NCCL environment variables, please refer to NVIDIA NCCLβs official documentation You can tune NCCL communicators even further using torch.distributed.ProcessGroupNCCL.NCCLConfig and torch.distributed.ProcessGroupNCCL.Options. Learn more about them using help (e.g. help(torch.distributed.ProcessGroupNCCL.NCCLConfig)) in the interpreter. Basics# The torch.distributed package provides PyTorch support and communication primitives for multiprocess parallelism across several computation nodes running on one or more machines. The class torch.nn.parallel.DistributedDataParallel() builds on this functionality to provide synchronous distributed training as a wrapper around any PyTorch model. This differs from the kinds of parallelism provided by Multiprocessing package - torch.multiprocessing and torch.nn.DataParallel() in that it supports multiple network-connected machines and in that the user must explicitly launch a separate copy of the main training script for each process. In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data-parallelism, including torch.nn.DataParallel(): Each process maintains its own optimizer and performs a complete optimization step with each iteration. While this may appear redundant, since the gradients have already been gathered together and averaged across processes and are thus the same for every process, this means that no parameter broadcast step is needed, reducing time spent transferring tensors between nodes. Each process contains an independent Python interpreter, eliminating the extra interpreter overhead and βGIL-thrashingβ that comes from driving several execution threads, model replicas, or GPUs from a single Python process. This is especially important for models that make heavy use of the Python runtime, including models with recurrent layers or many small components. Initialization# The package needs to be initialized using the torch.distributed.init_process_group() or torch.distributed.device_mesh.init_device_mesh() function before calling any other methods. Both block until all processes have joined. Warning Initialization is not thread-safe. Process group creation should be performed from a single thread, to prevent inconsistent βUUIDβ assignment across ranks, and to prevent races during initialization that can lead to hangs. torch.distributed.is_available()[source]# Return True if the distributed package is available. Otherwise, torch.distributed does not expose any other APIs. Currently, torch.distributed is available on Linux, MacOS and Windows. Set USE_DISTRIBUTED=1 to enable it when building PyTorch from source. Currently, the default value is USE_DISTRIBUTED=1 for Linux and Windows, USE_DISTRIBUTED=0 for MacOS. Return type bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# Initialize the default distributed process group. This will also initialize the distributed package. There are 2 main ways to initialize a process group: Specify store, rank, and world_size explicitly. Specify init_method (a URL string) which indicates where/how to discover peers. Optionally specify rank and world_size, or encode all required parameters in the URL and omit them. If neither is specified, init_method is assumed to be βenv://β. Parameters backend (str or Backend, optional) β The backend to use. Depending on build-time configurations, valid values include mpi, gloo, nccl, ucc, xccl or one that is registered by a third-party plugin. Since 2.6, if backend is not provided, c10d will use a backend registered for the device type indicated by the device_id kwarg (if provided). The known default registrations today are: nccl for cuda, gloo for cpu, xccl for xpu. If neither backend nor device_id is provided, c10d will detect the accelerator on the run-time machine and use a backend registered for that detected accelerator (or cpu). This field can be given as a lowercase string (e.g., "gloo"), which can also be accessed via Backend attributes (e.g., Backend.GLOO). If using multiple processes per machine with nccl backend, each process must have exclusive access to every GPU it uses, as sharing GPUs between processes can result in deadlock or NCCL invalid usage. ucc backend is experimental. Default backend for the device can be queried with get_default_backend_for_device(). init_method (str, optional) β URL specifying how to initialize the process group. Default is βenv://β if no init_method or store is specified. Mutually exclusive with store. world_size (int, optional) β Number of processes participating in the job. Required if store is specified. rank (int, optional) β Rank of the current process (it should be a number between 0 and world_size-1). Required if store is specified. store (Store, optional) β Key/value store accessible to all workers, used to exchange connection/address information. Mutually exclusive with init_method. timeout (timedelta, optional) β Timeout for operations executed against the process group. Default value is 10 minutes for NCCL and 30 minutes for other backends. This is the duration after which collectives will be aborted asynchronously and the process will crash. This is done since CUDA execution is async and it is no longer safe to continue executing user code since failed async NCCL operations might result in subsequent CUDA operations running on corrupted data. When TORCH_NCCL_BLOCKING_WAIT is set, the process will block and wait for this timeout. group_name (str, optional, deprecated) β Group name. This argument is ignored pg_options (ProcessGroupOptions, optional) β process group options specifying what additional options need to be passed in during the construction of specific process groups. As of now, the only options we support is ProcessGroupNCCL.Options for the nccl backend, is_high_priority_stream can be specified so that the nccl backend can pick up high priority cuda streams when thereβre compute kernels waiting. For other available options to config nccl, See https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t device_id (torch.device | int, optional) β a single, specific device this process will work on, allowing for backend-specific optimizations. Currently this has two effects, only under NCCL: the communicator is immediately formed (calling ncclCommInit immediately rather than the normal lazy call) and sub-groups will use ncclCommSplit when possible to avoid unnecessary overhead of group creation. If you want to know NCCL initialization error early, you can also use this field. If an int is provided, the API assumes that the accelerator type at compile time will be used. Note To enable backend == Backend.MPI, PyTorch needs to be built from source on a system that supports MPI. Note Support for multiple backends is experimental. Currently when no backend is specified, both gloo and nccl backends will be created. The gloo backend will be used for collectives with CPU tensors and the nccl backend will be used for collectives with CUDA tensors. A custom backend can be specified by passing in a string with format β:,:β, e.g. βcpu:gloo,cuda:custom_backendβ. torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, , mesh_dim_names=None, backend_override=None)[source]# Initializes a DeviceMesh based on device_type, mesh_shape, and mesh_dim_names parameters. This creates a DeviceMesh with an n-dimensional array layout, where n is the length of mesh_shape. If mesh_dim_names is provided, each dimension is labeled as mesh_dim_names[i]. Note init_device_mesh follows SPMD programming model, meaning the same PyTorch Python program runs on all processes/ranks in the cluster. Ensure mesh_shape (the dimensions of the nD array describing device layout) is identical across all ranks. Inconsistent mesh_shape may lead to hanging. Note If no process group is found, init_device_mesh will initialize distributed process group/groups required for distributed communications behind the scene. Parameters device_type (str) β The device type of the mesh. Currently supports: βcpuβ, βcuda/cuda-likeβ, βxpuβ. Passing in a device type with a GPU index, such as βcuda:0β, is not allowed. mesh_shape (Tuple[int]) β A tuple defining the dimensions of the multi-dimensional array describing the layout of devices. mesh_dim_names (Tuple[str], optional) β A tuple of mesh dimension names to assign to each dimension of the multi-dimensional array describing the layout of devices. Its length must match the length of mesh_shape. Each string in mesh_dim_names must be unique. backend_override (Dict[int | str, tuple[str, Options] | str | Options], optional) β Overrides for some or all of the ProcessGroups that will be created for each mesh dimension. Each key can be either the index of a dimension or its name (if mesh_dim_names is provided). Each value can be a tuple containing the name of the backend and its options, or just one of these two components (in which case the other will be set to its default value). Returns A DeviceMesh object representing the device layout. Return type DeviceMesh Example: >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# Check if the default process group has been initialized. Return type bool torch.distributed.is_mpi_available()[source]# Check if the MPI backend is available. Return type bool torch.distributed.is_nccl_available()[source]# Check if the NCCL backend is available. Return type bool torch.distributed.is_gloo_available()[source]# Check if the Gloo backend is available. Return type bool torch.distributed.distributed_c10d.is_xccl_available()[source]# Check if the XCCL backend is available. Return type bool torch.distributed.is_torchelastic_launched()[source]# Check whether this process was launched with torch.distributed.elastic (aka torchelastic). The existence of TORCHELASTIC_RUN_ID environment variable is used as a proxy to determine whether the current process was launched with torchelastic. This is a reasonable proxy since TORCHELASTIC_RUN_ID maps to the rendezvous id which is always a non-null value indicating the job id for peer discovery purposes.. Return type bool torch.distributed.get_default_backend_for_device(device)[source]# Return the default backend for the given device. Parameters device (Union[str, torch.device]) β The device to get the default backend for. Returns The default backend for the given device as a lower case string. Return type str Currently three initialization methods are supported: TCP initialization# There are two ways to initialize using TCP, both requiring a network address reachable from all processes and a desired world_size. The first way requires specifying an address that belongs to the rank 0 process. This initialization method requires that all processes have manually specified ranks. Note that multicast address is not supported anymore in the latest distributed package. group_name is deprecated as well. import torch.distributed as dist # Use address of one of the machines dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4) Shared file-system initialization# Another initialization method makes use of a file system that is shared and visible from all machines in a group, along with a desired world_size. The URL should start with file:// and contain a path to a non-existent file (in an existing directory) on a shared file system. File-system initialization will automatically create that file if it doesnβt exist, but will not delete the file. Therefore, it is your responsibility to make sure that the file is cleaned up before the next init_process_group() call on the same file path/name. Note that automatic rank assignment is not supported anymore in the latest distributed package and group_name is deprecated as well. Warning This method assumes that the file system supports locking using fcntl - most local systems and NFS support it. Warning This method will always create the file and try its best to clean up and remove the file at the end of the program. In other words, each initialization with the file init method will need a brand new empty file in order for the initialization to succeed. If the same file used by the previous initialization (which happens not to get cleaned up) is used again, this is unexpected behavior and can often cause deadlocks and failures. Therefore, even though this method will try its best to clean up the file, if the auto-delete happens to be unsuccessful, it is your responsibility to ensure that the file is removed at the end of the training to prevent the same file to be reused again during the next time. This is especially important if you plan to call init_process_group() multiple times on the same file name. In other words, if the file is not removed/cleaned up and you call init_process_group() again on that file, failures are expected. The rule of thumb here is that, make sure that the file is non-existent or empty every time init_process_group() is called. import torch.distributed as dist # rank should always be specified dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank) Environment variable initialization# This method will read the configuration from environment variables, allowing one to fully customize how the information is obtained. The variables to be set are: MASTER_PORT - required; has to be a free port on machine with rank 0 MASTER_ADDR - required (except for rank 0); address of rank 0 node WORLD_SIZE - required; can be set either here, or in a call to init function RANK - required; can be set either here, or in a call to init function The machine with rank 0 will be used to set up all connections. This is the default method, meaning that init_method does not have to be specified (or can be env://). Improving initialization time# TORCH_GLOO_LAZY_INIT - establishes connections on demand rather than using a full mesh which can greatly improve initialization time for non all2all operations. Post-Initialization# Once torch.distributed.init_process_group() was run, the following functions can be used. To check whether the process group has already been initialized use torch.distributed.is_initialized(). class torch.distributed.Backend(name)[source]# An enum-like class for backends. Available backends: GLOO, NCCL, UCC, MPI, XCCL, and other registered backends. The values of this class are lowercase strings, e.g., "gloo". They can be accessed as attributes, e.g., Backend.NCCL. This class can be directly called to parse the string, e.g., Backend(backend_str) will check if backend_str is valid, and return the parsed lowercase string if so. It also accepts uppercase strings, e.g., Backend("GLOO") returns "gloo". Note The entry Backend.UNDEFINED is present but only used as initial value of some fields. Users should neither use it directly nor assume its existence. classmethod register_backend(name, func, extended_api=False, devices=None)[source]# Register a new backend with the given name and instantiating function. This class method is used by 3rd party ProcessGroup extension to register new backends. Parameters name (str) β Backend name of the ProcessGroup extension. It should match the one in init_process_group(). func (function) β Function handler that instantiates the backend. The function should be implemented in the backend extension and takes four arguments, including store, rank, world_size, and timeout. extended_api (bool, optional) β Whether the backend supports extended argument structure. Default: False. If set to True, the backend will get an instance of c10d::DistributedBackendOptions, and a process group options object as defined by the backend implementation. device (str or list of str, optional) β device type this backend supports, e.g. βcpuβ, βcudaβ, etc. If None, assuming both βcpuβ and βcudaβ Note This support of 3rd party backend is experimental and subject to change. torch.distributed.get_backend(group=None)[source]# Return the backend of the given process group. Parameters group (ProcessGroup, optional) β The process group to work on. The default is the general main process group. If another specific group is specified, the calling process must be part of group. Returns The backend of the given process group as a lower case string. Return type Backend torch.distributed.get_rank(group=None)[source]# Return the rank of the current process in the provided group, default otherwise. Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to world_size. Parameters group (ProcessGroup, optional) β The process group to work on. If None, the default process group will be used. Returns The rank of the process group -1, if not part of the group Return type int torch.distributed.get_world_size(group=None)[source]# Return the number of processes in the current process group. Parameters group (ProcessGroup, optional) β The process group to work on. If None, the default process group will be used. Returns The world size of the process group -1, if not part of the group Return type int Shutdown# It is important to clean up resources on exit by calling destroy_process_group(). The simplest pattern to follow is to destroy every process group and backend by calling destroy_process_group() with the default value of None for the group argument, at a point in the training script where communications are no longer needed, usually near the end of main(). The call should be made once per trainer-process, not at the outer process-launcher level. if destroy_process_group() is not called by all ranks in a pg within the timeout duration, especially when there are multiple process-groups in the application e.g. for N-D parallelism, hangs on exit are possible. This is because the destructor for ProcessGroupNCCL calls ncclCommAbort, which must be called collectively, but the order of calling ProcessGroupNCCLβs destructor if called by pythonβs GC is not deterministic. Calling destroy_process_group() helps by ensuring ncclCommAbort is called in a consistent order across ranks, and avoids calling ncclCommAbort during ProcessGroupNCCLβs destructor. Reinitialization# destroy_process_group can also be used to destroy individual process groups. One use case could be fault tolerant training, where a process group may be destroyed and then a new one initialized during runtime. In this case, itβs critical to synchronize the trainer processes using some means other than torch.distributed primitives _after_ calling destroy and before subsequently initializing. This behavior is currently unsupported/untested, due to the difficulty of achieving this synchronization, and is considered a known issue. Please file a github issue or RFC if this is a use case thatβs blocking you. Groups# By default collectives operate on the default group (also called the world) and require all processes to enter the distributed function call. However, some workloads can benefit from more fine-grained communication. This is where distributed groups come into play. new_group() function can be used to create new groups, with arbitrary subsets of all processes. It returns an opaque group handle that can be given as a group argument to all collectives (collectives are distributed functions to exchange information in certain well-known programming patterns). torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]# Create a new distributed group. This function requires that all processes in the main group (i.e. all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes. Warning Safe concurrent usage: When using multiple process groups with the NCCL backend, the user must ensure a globally consistent execution order of collectives across ranks. If multiple threads within a process issue collectives, explicit synchronization is necessary to ensure consistent ordering. When using async variants of torch.distributed communication APIs, a work object is returned and the communication kernel is enqueued on a separate CUDA stream, allowing overlap of communication and computation. Once one or more async ops have been issued on one process group, they must be synchronized with other cuda streams by calling work.wait() before using another process group. See Using multiple NCCL communicators concurrently for more details. Parameters ranks (list[int]) β List of ranks of group members. If None, will be set to all ranks. Default is None. timeout (timedelta, optional) β see init_process_group for details and default value. backend (str or Backend, optional) β The backend to use. Depending on build-time configurations, valid values are gloo and nccl. By default uses the same backend as the global group. This field should be given as a lowercase string (e.g., "gloo"), which can also be accessed via Backend attributes (e.g., Backend.GLOO). If None is passed in, the backend corresponding to the default process group will be used. Default is None. pg_options (ProcessGroupOptions, optional) β process group options specifying what additional options need to be passed in during the construction of specific process groups. i.e. for the nccl backend, is_high_priority_stream can be specified so that process group can pick up high priority cuda streams. For other available options to config nccl, See https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, optional): perform a group-local barrier at the end of the process group creation. This is different in that non-member ranks donβt need to call into API and donβt join the barrier. group_desc (str, optional) β a string to describe the process group. device_id (torch.device, optional) β a single, specific device to βbindβ this process to, The new_group call will try to initialize a communication backend immediately for the device if this field is given. Returns A handle of distributed group that can be given to collective calls or GroupMember.NON_GROUP_MEMBER if the rank is not part of ranks. N.B. use_local_synchronization doesnβt work with MPI. N.B. While use_local_synchronization=True can be significantly faster with larger clusters and small process groups, care must be taken since it changes cluster behavior as non-member ranks donβt join the group barrier(). N.B. use_local_synchronization=True can lead to deadlocks when each rank creates multiple overlapping process groups. To avoid that, make sure all ranks follow the same global creation order. torch.distributed.get_group_rank(group, global_rank)[source]# Translate a global rank into a group rank. global_rank must be part of group otherwise this raises RuntimeError. Parameters group (ProcessGroup) β ProcessGroup to find the relative rank. global_rank (int) β Global rank to query. Returns Group rank of global_rank relative to group Return type int N.B. calling this function on the default process group returns identity torch.distributed.get_global_rank(group, group_rank)[source]# Translate a group rank into a global rank. group_rank must be part of group otherwise this raises RuntimeError. Parameters group (ProcessGroup) β ProcessGroup to find the global rank from. group_rank (int) β Group rank to query. Returns Global rank of group_rank relative to group Return type int N.B. calling this function on the default process group returns identity torch.distributed.get_process_group_ranks(group)[source]# Get all ranks associated with group. Parameters group (Optional[ProcessGroup]) β ProcessGroup to get all ranks from. If None, the default process group will be used. Returns List of global ranks ordered by group rank. Return type list[int] DeviceMesh# DeviceMesh is a higher level abstraction that manages process groups (or NCCL communicators). It allows user to easily create inter node and intra node process groups without worrying about how to set up the ranks correctly for different sub process groups, and it helps manage those distributed process group easily. init_device_mesh() function can be used to create new DeviceMesh, with a mesh shape describing the device topology. class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, , mesh_dim_names=None, backend_override=None, _init_backend=True)[source]# DeviceMesh represents a mesh of devices, where layout of devices could be represented as a n-d dimension array, and each value of the n-d dimensional array is the global id of the default process group ranks. DeviceMesh could be used to setup the N dimensional device connections across the cluster, and manage the ProcessGroups for N dimensional parallelisms. Communications could happen on each dimension of the DeviceMesh separately. DeviceMesh respects the device that user selects already (i.e. if user call torch.cuda.set_device before the DeviceMesh initialization), and will select/set the device for the current process if user does not set the device beforehand. Note that manual device selection should happen BEFORE the DeviceMesh initialization. DeviceMesh can also be used as a context manager when using together with DTensor APIs. Note DeviceMesh follows SPMD programming model, which means the same PyTorch Python program is running on all processes/ranks in the cluster. Therefore, users need to make sure the mesh array (which describes the layout of devices) should be identical across all ranks. Inconsistent mesh will lead to silent hang. Parameters device_type (str) β The device type of the mesh. Currently supports: βcpuβ, βcuda/cuda-likeβ. mesh (ndarray) β A multi-dimensional array or an integer tensor describing the layout of devices, where the IDs are global IDs of the default process group. Returns A DeviceMesh object representing the device layout. Return type DeviceMesh The following program runs on each process/rank in an SPMD manner. In this example, we have 2 hosts with 4 GPUs each. A reduction over the first dimension of mesh will reduce across columns (0, 4), .. and (3, 7), a reduction over the second dimension of mesh reduces across rows (0, 1, 2, 3) and (4, 5, 6, 7). Example: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) static from_group(group, device_type, mesh=None, , mesh_dim_names=None)[source]# Constructs a DeviceMesh with device_type from an existing ProcessGroup or a list of existing ProcessGroup. The constructed device mesh has number of dimensions equal to the number of groups passed. For example, if a single process group is passed in, the resulted DeviceMesh is a 1D mesh. If a list of 2 process groups is passed in, the resulted DeviceMesh is a 2D mesh. If more than one group is passed, then the mesh and mesh_dim_names arguments are required. The order of the process groups passed in determines the topology of the mesh. For example, the first process group will be the 0th dimension of the DeviceMesh. The mesh tensor passed in must have the same number of dimensions as the number of process groups passed in, and the order of the dimensions in the mesh tensor must match the order in the process groups passed in. Parameters group (ProcessGroup or list[ProcessGroup]) β the existing ProcessGroup or a list of existing ProcessGroups. device_type (str) β The device type of the mesh. Currently supports: βcpuβ, βcuda/cuda-likeβ. Passing in a device type with a GPU index, such as βcuda:0β, is not allowed. mesh (torch.Tensor or ArrayLike, optional) β A multi-dimensional array or an integer tensor describing the layout of devices, where the IDs are global IDs of the default process group. Default is None. mesh_dim_names (tuple[str], optional) β A tuple of mesh dimension names to assign to each dimension of the multi-dimensional array describing the layout of devices. Its length must match the length of mesh_shape. Each string in mesh_dim_names must be unique. Default is None. Returns A DeviceMesh object representing the device layout. Return type DeviceMesh get_all_groups()[source]# Returns a list of ProcessGroups for all mesh dimensions. Returns A list of ProcessGroup object. Return type list[torch.distributed.distributed_c10d.ProcessGroup] get_coordinate()[source]# Return the relative indices of this rank relative to all dimensions of the mesh. If this rank is not part of the mesh, return None. Return type Optional[list[int]] get_group(mesh_dim=None)[source]# Returns the single ProcessGroup specified by mesh_dim, or, if mesh_dim is not specified and the DeviceMesh is 1-dimensional, returns the only ProcessGroup in the mesh. Parameters mesh_dim (str/python:int, optional) β it can be the name of the mesh dimension or the index None. (of the mesh dimension. Default is) β Returns A ProcessGroup object. Return type ProcessGroup get_local_rank(mesh_dim=None)[source]# Returns the local rank of the given mesh_dim of the DeviceMesh. Parameters mesh_dim (str/python:int, optional) β it can be the name of the mesh dimension or the index None. (of the mesh dimension. Default is) β Returns An integer denotes the local rank. Return type int The following program runs on each process/rank in an SPMD manner. In this example, we have 2 hosts with 4 GPUs each. Calling mesh_2d.get_local_rank(mesh_dim=0) on rank 0, 1, 2, 3 would return 0. Calling mesh_2d.get_local_rank(mesh_dim=0) on rank 4, 5, 6, 7 would return 1. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 0, 4 would return 0. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 1, 5 would return 1. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 2, 6 would return 2. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 3, 7 would return 3. Example: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) get_rank()[source]# Returns the current global rank. Return type int Point-to-point communication# torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# Send a tensor synchronously. Warning tag is not supported with the NCCL backend. Parameters tensor (Tensor) β Tensor to send. dst (int) β Destination rank on global process group (regardless of group argument). Destination rank should not be the same as the rank of the current process. group (ProcessGroup, optional) β The process group to work on. If None, the default process group will be used. tag (int, optional) β Tag to match send with remote recv group_dst (int, optional) β Destination rank on group. Invalid to specify both dst and group_dst. torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source]# Receives a tensor synchronously. Warning tag is not supported with the NCCL backend. Parameters tensor (Tensor) β Tensor to fill with received data. src (int, optional) β Source rank on global process group (regardless of group argument). Will receive from any process if unspecified. group (ProcessGroup, optional) β The process group to work on. If None, the default process group will be used. tag (int, optional) β Tag to match recv with remote send group_src (int, optional) β Destination rank on group. Invalid to specify both src and group_src. Returns Sender rank -1, if not part of the group Return type int isend() and irecv() return distributed request objects when used. In general, the type of this object is unspecified as they should never be created manually, but they are guaranteed to support two methods: is_completed() - returns True if the operation has finished wait() - will block the process until the operation is finished. is_completed() is guaranteed to return True once it returns. torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# Send a tensor asynchronously. Warning Modifying tensor before the request completes causes undefined behavior. Warning tag is not supported with the NCCL backend. Unlike send, which is blocking, isend allows src == dst rank, i.e. send to self. Parameters tensor (Tensor) β Tensor to send. dst (int) β Destination rank on global process group (regardless of group argument) group (ProcessGroup, optional) β The process group to work on. If None, the default process group will be used. tag (int, optional) β Tag to match send with remote recv group_dst (int, optional) β Destination rank on group. Invalid to specify both dst and group_dst Returns A distributed request object. None, if not part of the group Return type Optional[Work] torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]# Receives a tensor asynchronously. Warning tag is not supported with the NCCL backend. Unlike recv, which is blocking, irecv allows src == dst rank, i.e. recv from self. Parameters tensor (Tensor) β Tensor to fill with received data. src (int, optional) β Source rank on global process group (regardless of group argument). Will receive from any process if unspecified. group (ProcessGroup, optional) β The process group to work on. If None, the default process group will be used. tag (int, optional) β Tag to match recv with remote send group_src (int, optional) β Destination rank on group. Invalid to specify both src and group_src. Returns A distributed request object. None, if not part of the group Return type Optional[Work] torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[source]# Sends picklable objects in object_list synchronously. Similar to send(), but Python objects can be passed in. Note that all objects in object_list must be picklable in order to be sent. Parameters object_list (List[Any]) β List of input objects to sent. Each object must be picklable. Receiver must provide lists of equal sizes. dst (int) β Destination rank to send object_list to. Destination rank is based on global process group (regardless of group argument) group (Optional[ProcessGroup]) β (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Default is None. device (torch.device, optional) β If not None, the objects are serialized and converted to tensors which are moved to the device before sending. Default is None. group_dst (int, optional) β Destination rank on group. Must specify one of dst and group_dst but not both use_batch (bool, optional) β If True, use batch p2p operations instead of regular send operations. This avoids initializing 2-rank communicators and uses existing entire group communicators. See batch_isend_irecv for usage and assumptions. Default is False. Returns None. Note For NCCL-based process groups, internal tensor representations of objects must be moved to the GPU device before communication takes place. In this case, the device used is given by torch.cuda.current_device() and it is the userβs responsibility to ensure that this is set so that each rank has an individual GPU, via torch.cuda.set_device(). Warning Object collectives have a number of serious performance and scalability limitations. See Object collectives for details. Warning send_object_list() uses pickle module implicitly, which is known to be insecure. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. Only call this function with data you trust. Warning Calling send_object_list() with GPU tensors is not well supported and inefficient as it incurs GPU -> CPU transfer since tensors would be pickled. Please consider using send() instead. Example::>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[source]# Receives picklable objects in object_list synchronously. Similar to recv(), but can receive Python objects. Parameters object_list (List[Any]) β List of objects to receive into. Must provide a list of sizes equal to the size of the list being sent. src (int, optional) β Source rank from which to recv object_list. Source rank is based on global process group (regardless of group argument) Will receive from any rank if set to None. Default is None. group (Optional[ProcessGroup]) β (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Default is None. device (torch.device, optional) β If not None, receives on this device. Default is None. group_src (int, optional) β Destination rank on group. Invalid to specify both src and group_src. use_batch (bool, optional) β If True, use batch p2p operations instead of regular send operations. This avoids initializing 2-rank communicators and uses existing entire group communicators. See batch_isend_irecv for usage and assumptions. Default is False. Returns Sender rank. -1 if rank is not part of the group. If rank is part of the group, object_list will contain the sent objects from src rank. Note For NCCL-based process groups, internal tensor representations of objects must be moved to the GPU device before communication takes place. In this case, the device used is given by torch.cuda.current_device() and it is the userβs responsibility to ensure that this is set so that each rank has an individual GPU, via torch.cuda.set_device(). Warning Object collectives have a number of serious performance and scalability limitations. See Object collectives for details. Warning recv_object_list() uses pickle module implicitly, which is known to be insecure. It is possible to construct malicious pickle data which will execute arbitrary