Skip to content

Distributed TensorFlow

Albert Zeyer edited this page Jun 11, 2020 · 76 revisions

Distributed TensorFlow is the official way how multiple machines can remotely communicate with each other, and how multiple devices (GPUs) can be used. (We cover mostly only data parallelism here, not model parallelism.)

Related is multi GPU training in RETURNN via Horovod, but Horovod is an orthogonal implementation to what TF itself provides. Related are also the Tensorpack TF ZMQ ops, which also provide a way for remote communication (via ZeroMQ). TensorFlow itself uses gRPC (for remote communication) and NCCL (for Nvidia direct GPU transfers). (What operators are there for remote communication?)

An overview of all possible variations of distributed training (for different hardware, cluster environment, software (Horovod, distributed TF), strategies) is collected under distributed training experience.

This wiki page here is fully about the TF builtin features. This is less about RETURNN, but mostly an extension for the official TF documentation, or an overview. (The support of distributed TensorFlow in RETURNN is discussed in issue #296.)

Important insight: There are two things which can be synchronized in distributed TensorFlow:

  • TF resources (e.g. variables, datasets, tensor-arrays, queues, ...) (via their unique name across all sessions): The resource lives only in one of the session (or server?) (pinned via tf.device), and other sessions/servers access it via gRPC. In between-graph replication, this resource sharing can be the only type of synchronization.
  • TF ops: Defined by a graph, which is then executed via a session on a server, or potentially multiple servers. While the session connects to one single server, the server is potentially connected to other servers (in the cluster). Every op is pinned to one worker/server/device (via tf.device), and will be executed there (potentially via gRPC).

Distributed TensorFlow covers multiple levels of functionality:

  • Low level: TF server and client (TF session) connecting to the server via gRPC. ClusterSpec describes the collection of all servers.
  • High level via strategies (tf.distribute.Strategy). This can be used by the Keras API, by the Estimator API, and by a custom training loop.
  • Concepts and terminology.

It is totally ok to only use the low level functionality (TF servers with gRPC) but not the Strategy concept.

Resources

Terminology

There is a lot of new terminology around multi-GPU training, or distributed TensorFlow. It also can differ between framework (e.g. MPI or PyTorch have similar concepts but with partly different terminology).

Multi-GPU training (or also multi-TPU) can be on a single machine (computer, host), or across multiple, where each machine can have multiple GPUs (devices). In the case of multiple machines, this would usually be managed via some cluster engine. A cluster engine manages a collection of machines, each having some computing resources (CPUs, GPUs, memory), and specifically it manages the queueing of jobs (processes - instance of a program) on them. The jobs has certain restrictions (use N CPUs, M GPUs, some specific amount of memory, run up to T hours). In our case, we mostly use Sun Grid Engine (SGE) (later renamed to Oracle Grid Engine). TensorFlow has official support for Google Compute Engine (GCE), Slurm, and some others but not SGE. But this is not so problematic. In SGE, every computer is called a node. We adopt this terminology. A node in our cluster has usually 4 GPUs. Multiple jobs can run on a single node, if not all resources are used by a single job. (SGE specific: A job can also run across multiple nodes, via the -pe option for qsub, which will effectively run one process per node (in parallel). This is the parallel environments feature. See here for an example. You need this if you want to do multi-node-multi-GPU training.)

You must run at least one instance of your program per node. In practice, we will likely run one instance per GPU device per node, so each instance will have access to a single GPU. The instances will communicate somehow with each other (e.g. using Horovod or distributed TensorFlow; this is what this wiki page will cover; internally via NCCL or MPI or GRPC or so). There might be one master (or chief) instance, which takes care of everything (initializing the computation graph, loading data, saving checkpoints, etc), and all the other instances will just idle around and wait for commands by the master. Or the other instances might also do everything (or most things) on their own but then synchronize somehow with each other.

See also the glossary of tf.distribute which partly covers this as well. And the distributed TensorFlow deploy example, which also covers the terminology and an overview.

  • Device: A (single, specific) GPU (or also TPU).
  • Data parallelism: Computation is parallelized over the data. This is more common, and relatively easy, as there is huge amount of data. This is mostly what we talk about here.
  • Model parallelism: Computation is parallelized over the model. This only makes sense if the model is very big (maybe so big that it would not be possible otherwise, because it does not fit into the memory of a single device). We don't really cover that here. This can be orthogonal to data parallelism.
  • Task: Basically this is equivalent to one instance of the program, i.e. one process. Or more specifically one instance of a TF server (i.e. tf.distribute.Server). This is in the context of ClusterSpec and when running a TF server. This specifies one specific instance, i.e. one specific TF server, running on some host, which is reachable under "<host>:<port>". A task has a task type, sometimes also referred to as job, such as "worker" or "chief". Within a job (e.g. the workers), there are a number of tasks, and each one has a specific index, determined by the list of all tasks in this job by ClusterSpec.
  • Job: Synonym for task type (or also job name). A job comprises a list of tasks, which typically serve a common purpose. For example "ps" or "worker". Do not confuse this with the cluster job. A single cluster job (e.g. via SGE -pe) would cover all the TF tasks which belongs together.
  • Worker and chief (master) (and maybe parameter server): These are task types. These are the roles of one instance of a TF server. worker is an instance of a task, or task type, or sometimes also job name, or host.
  • Worker: This task will execute a single train step (on one copy of the model).
  • Chief (master): This task is a worker with some extra responsibilities such as saving the checkpoint.
  • Parameter server: These are machines that hold a single copy of parameters/variables. They don't have much to do, except for managing all reads/writes to the variables. This is usually used for async training, as all other workers would access the parameter server(s) in an async way. This is the default setup for async training in TF. But this is not really necessary in general for async training. In RETURNN, we would do async training, but sync directly the models. (Maybe it depends on the specific definition of async...)
  • Host: Can mean a machine, or a specific process. Mostly refers to the TF server, specified via the address it is reachable under ("<host>:<port>").
  • (TensorFlow) Server: Instance of tf.distribute.Server (earlier tf.train.Server). This is basically one host, or one task. Part of a cluster. A TF session (client) can connect to a server and then execute commands (sub graphs).
  • Client: Instance of tf.compat.v1.Session (earlier tf.Session) which connects to a server. A client also constructs the computation graph.
  • Cluster: A collection of tasks, i.e. collection of machines (more specifically running TF servers, reachable under "<host>:<port>", that participate in the distributed execution of a TensorFlow graph. This is described via ClusterSpec (see below).
  • Cluster resolver: E.g. for GCE, it will automatically figure out the environment, how much nodes there are, and derive the number of workers or machines, and their addresses, for the ClusterSpec (see below).
  • Distributed strategy: In general terms, this covers all the aspects of data-parallelism, like in-graph replication, between-graph replication, asynchronous training and synchronous training, for all kinds of machine and device setups, and it also covers the data input pipeline. More specifically, TF provides an official API including several implementations for this: This is an instance or subclass / implementation of tf.distribute.Strategy. This is the TensorFlow API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.
  • Replica: One copy of the model (parameters and computation graph), running on one slice of the input data. Right now each replica is executed on its own worker device. A replica may span multiple worker devices (for model parallelism).
  • In-graph replication: A single client builds a single tf.Graph that contains one set of parameters; and multiple copies of the compute-intensive part of the model, each pinned to a different task in "/job:worker" (via tf.device).
  • Between-graph replication: There is a separate client for each worker task, typically in the same process as the worker task. Each client builds a similar graph containing the parameters and a single copy of the compute-intensive part of the model, pinned to the local task in "/job:worker". This can be also mixed with in-graph replication, e.g. covering multiple (machine local) devices in one graph (i.e. in-graph replication per machine), but each machine has its own graph (i.e. between-graph replication across machines).
  • Replica context (or replica mode) vs cross-replica context (or cross-replica mode): Replica code operates on replica-specific tensors. Cross-replica code considers the whole cluster, and does all-reduce or other logic. (Also see this StackOverflow question.)
  • Asynchronous training: Each replica of the graph has an independent training loop that executes without coordination. It is compatible with both forms of replication above. This usually uses a parameter server but there are other options as well.
  • Synchronous training: All of the replicas read the same values for the current parameters, compute gradients in parallel, and then apply them together. It is compatible with in-graph replication and between-graph replication. This would usually use some all-reduce algorithm, and use mirrored variables.
  • Mirrored variables (or in general mirrored values): A copy of the variable lives on every device (or replica) but it is the same everywhere (kind of the opposite of a parameter server, although both concepts still could be used together). This would usually use some all-reduce algorithm to keep the variables synchronized, e.g. by calculating the average gradient across all workers, and applying always the same update everywhere. The opposite of mirrored values are per-replica values, which are different per replica.
  • Per-replica values: These are values which are different per replica, such as inputs and activations.
  • All-reduce: From every device, to every device. Reduce mean, sum, max, or so. E.g. to calculate accumulated gradient for synchronous training.

ClusterDef or ClusterSpec

This describes a cluster. This can be an instance of tf.train.ClusterSpec. Or tf.train.ClusterDef. Or just a dict.

In all cases, this represents a dict, containing:

  • worker: list of strings ("<host>:<port>") of TF servers
  • All other task types (see below) are possible keys as well ('ps', 'chief', ...).
  • local: Not sure...

The same cluster definition would be shared for all tasks. This is used by the TF server.

ServerDef

An instance of tf.train.ServerDef, or just a dict. This can be used by the TF server for initialization.

This represents a dict, having these entries:

  • cluster: ClusterSpec, to define the cluster (see above)
  • task: Dict containing:
    • type: 'worker', 'ps' (parameter server), 'evaluator', 'chief' or 'master'. Usually it will be a worker. A chief is a worker which does a bit more, like saving the checkpoint (like rank 0 in MPI).
    • index: int. This task index corresponds to the rank in MPI terminology. Although it is counted per-type. I.e. there is both a chief with index 0 and a worker with index 0. This must correspond to one entry of the ClusterSpec.

TF_CONFIG

Environment variable, used by TensorFlow (always? or in what cases?) to setup distributed sessions (?).

Example (via):

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

It defines the ServerDef.

TF server

Represents one task, or host, part of a cluster.

Used for TF sessions to connect to (remotely, via gRPC). Example for a single local server (where the cluster only consists of this single local server) (via):

server = tf.train.Server.create_local_server()
sess = tf.Session(server.target)

(Difference to standard session.)

When starting the server, you can choose any port you want. But all the ports (and hostnames) must be known in advance, to be able to specify the ClusterSpec.

(Is it possible to add a worker on-the-fly? Why does a TF server needs to know about all other tasks/workers in the cluster?)

Usually a single tf.Session (client) will connect to one server, although it is possible that multiple clients connect to a single server.

To start a server, you would create an instance of tf.train.Server. For that, you must know the definition of the cluster, and specify the ClusterSpec. And the server must know which of the tasks of the cluster it represents itself (either you specify a ServerDef, or you pass job_name and task_index).

Example (via, extended):

# For all tasks:
cluster_spec = tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})

# For each task, for example job_name="worker", task_index=0 (from [0, 1, 2]).
# Every task runs in its own process, on potentially different machines (hosts).
server = tf.train.Server(cluster_spec, job_name="worker", task_index=0)

# If in-graph replication, and not the chief, now just do:
server.join()

# Otherwise, for between-graph replication,
# or in-graph replication and we are the chief:

# Build task-specific computation graph here... (between-graph replication)
# Build whole computation graph ... (in-graph replication)

# Now start the client.
with tf.compat.v1.Session(server.target) as session:
  while True:  # training loop
    # Perform task-specific computation.
    # This might include synchronization. Or uses the parameter server.
    session.run(...)

  # If we are the chief, save checkpoint.

It might be useful to restrict access in a session to other servers (except needed servers). It might be typical for between-graph configuration, that the individual workers do not communicate. You can tell TensorFlow to ignore the absence of another worker at session-creation time, using tf.ConfigProto (via Derek Murray (mrry)):

# Each worker only needs to contact the PS task(s) and the local worker task.
config = tf.ConfigProto(device_filters=[
    '/job:ps', '/job:worker/task:%d' % arguments.task_index])

with tf.train.MonitoredTrainingSession(
    master=server.target,
    config=config,
    is_chief=(arguments.task_index == 0 and (
              arguments.job_name == 'worker'))) as sess:
  # ...

shared_name argument for TF ops

(on many operations/tensors, e.g. initializable_iterator)

make_initializable_iterator SO question, my SO question.

remote_call TF op

Not sure... (Used by MultiDeviceIterator.) (StackOverflow question: remote_call vs tf.device)

tf.distribute.Strategy

Higher-level API (API doc). This encapsulates the distributed strategy (see terminology above) (for data parallelism) and all its aspects (input pipeline, distributed computing, synchronization, in-graph replication vs between-graph replication, ...).

It is not required to use this high-level API to use any of these aspects. It is supposed to make the usage very simple, and keep the modifications on existing code very minimal.

In RETURNN, we likely would not use the input pipeline provided by the Strategy API and have some own (more flexible) solution for that. Our own processing would by default not use sharding (because that is inefficient in general; requires extra work to make efficient). We rather would have a single dedicated worker for the dataset, and its output would get distributed to the other train workers. I.e. we would have the training and data preprocessing decoupled. There is no TF Strategy implementation which covers this case. (TF feature request: Decoupling preprocessing and training.) (RETURNN would not be restricted to this though; it would also support the TF dataset pipeline with sharding.)

In RETURNN, we would want to have asynchronous training not in the TF definition/way: In the default case, we would not have parameter servers (but rather have mirrored variables). We synchronize every N steps and train completely independent in between. There is no TF Strategy implementation which covers this case. (RETURNN would not be restricted to this though; it would also support the common TF strategies, e.g. with parameter servers.)

In RETURNN, in general, we would want a solution which is generic enough to allow all these possible variations.

Other relevant functions

  • tf.train.replica_device_setter