Distributed Training

Horovod, TensorFlow Distributed Training, PyTorch Distributed Training

Distributed Training Goals

๋ถ„์‚ฐํ•™์Šต ํ”„๋ ˆ์ž„์›Œํฌ๋Š” ๊ธฐ์กด์˜ ML๋ชจ๋ธ ํ•™์Šต ์ฝ”๋“œ๋ฅผ ์กฐ๊ธˆ๋งŒ ๋ฐ”๊พธ์–ด๋„ ๋ถ„์‚ฐํ•™์Šต์„ ์ง€์›ํ•˜๋Š” ๊ฒƒ์„ ๋ชฉํ‘œ๋กœ ํ•œ๋‹ค. ๋Œ€๊ทœ๋ชจ ML๋ชจ๋ธ๋„ ํ•™์Šต ๊ฐ€๋Šฅํ•ด์•ผ ํ•˜๋ฉฐ(Model Parallelism) ML๋ชจ๋ธ ํ•™์Šต ์‹œ๊ฐ„๋„ ์ค„์ผ ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค(Data Parallelism).

Model Parallelism

Data Parallelism

Distributed Training Mechanism

Synchronous training

๋™๊ธฐ์‹ ํ•™์Šต์€ ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ์ชผ๊ฐœ์„œ Worker๋“ค์ด Gradient๋ฅผ ๊ณ„์‚ฐํ•˜๊ณ , ๊ณ„์‚ฐํ•œ Gradient๋ฅผ ์ง‘๊ณ„ํ•œ ํ›„ ์ƒˆ๋กœ์šด ML๋ชจ๋ธ์„ ์ƒ์„ฑํ•œ๋‹ค. ์ƒ์„ฑํ•œ ML๋ชจ๋ธ์„ Worker์— ์ „์†กํ•˜๊ณ  Gradient๋ฅผ ๊ณ„์‚ฐํ•˜๋Š” ๊ณผ์ •์„ ๋ฐ˜๋ณตํ•˜์—ฌ ํ•™์Šต์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

Asynchronous training

๋น„๋™์‹ ํ•™์Šต์€ Worker๋“ค์ด ๊ฐ๊ฐ ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•ด Gradient๋ฅผ ๊ณ„์‚ฐํ•˜๊ณ , ๊ฐ๊ฐ ๋น„๋™๊ธฐ์ ์œผ๋กœ Gradient๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค. ์ผ๋ฐ˜์ ์œผ๋กœ ๋™๊ธฐ์‹ ํ•™์Šต์€ all-reduce ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„ํ•˜๊ณ , ๋น„๋™๊ธฐ์‹ ํ•™์Šต์€ Parameter Server ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

Parameter Server Training

Worker๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šตํ•˜์—ฌ Gradient๋ฅผ ๊ณ„์‚ฐํ•œ ํ›„, Parameter Server๋กœ ์ „์†กํ•˜๊ณ  ํ‰๊ท ์„ ๊ณ„์‚ฐํ•ด ๋‹ค์‹œ Worker๋กœ ์ „์†กํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค. Worker๋Š” ๋Œ€์—ญํญ ์ „์ฒด๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š์ง€๋งŒ, Parameter Server๋Š” ๋Œ€์—ญํญ ๋ณ‘๋ชฉํ˜„์ƒ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค. Parameter Server๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ ๋‘˜ ๊ฒฝ์šฐ, ๋„คํŠธ์›Œํฌ ์ƒํ˜ธ ์—ฐ๊ฒฐ์ด ๋ณต์žกํ•ด ์งˆ ์ˆ˜๋„ ์žˆ๋‹ค.

All Reduce-based Distributed Training

Worker๊ฐ€ ์„œ๋กœ Gradient๋ฅผ ์ฃผ๊ณ  ๋ฐ›์œผ๋ฉด์„œ Reducingํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค. Local ReduceScatter, Remote AllReduce, Local Gather ์ˆœ์œผ๋กœ Reducing ์„ ์ง„ํ–‰ํ•œ๋‹ค.

TensorFlow Distributed Training

TensorFlow ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์—์„œ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋Š” ์—ญํ• ์€ Chief, PS, Worker, Evaluator ์ค‘ ํ•˜๋‚˜์ด๋ฉฐ, PS ์—ญํ• ์€ Parameter Server Training์—์„œ๋งŒ ์‚ฌ์šฉํ•œ๋‹ค. Chief๋Š” ๋ชจ๋ธ ์ฒดํฌํฌ์ธํŠธ์„ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. PS๋Š” ๋ชจ๋ธ ํŒŒ๋ผ๋ฏธํ„ฐ ์„œ๋ฒ„ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. Worker๋Š” Gradient ๊ตฌํ•˜๋Š” ์—ญํ• ์„ ํ•˜๋ฉฐ, Chief ์„ค์ •์„ ํ•˜์ง€ ์•Š์•˜๋‹ค๋ฉด 0๋ฒˆ Worker๊ฐ€ Chief๊ฐ€ ๋œ๋‹ค. Evaluator๋Š” ํ‰๊ฐ€ ์ง€ํ‘œ ๊ณ„์‚ฐํ•˜๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.

TensorFlow ๋ถ„์‚ฐํ™˜๊ฒฝ ํด๋Ÿฌ์Šคํ„ฐ ๊ตฌ์„ฑ TensorFlow ๋Š” tf.distribute.Strategy ํŒจํ‚ค์ง€๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ถ„์‚ฐํ•™์Šต์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ถ„์‚ฐํ•™์Šต ์„ ์ˆ˜ํ–‰ํ•  ํด๋Ÿฌ์Šคํ„ฐ ํ™˜๊ฒฝ ๊ตฌ์„ฑ์€ tf.train.ClusterSpec์— ์ง์ ‘ ์„ค์ •ํ•˜๊ฑฐ๋‚˜, TF_CONFIG ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ์— ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

tf.train.ClusterSpec

cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
                                           "worker1.example.com:2222",
                                           "worker2.example.com:2222"],
                                "ps": ["ps0.example.com:2222",
                                       "ps1.example.com:2222"]})

TF_CONFIG Environment variable

TF_CONFIG ํ™˜๊ฒฝ๋ณ€์ˆ˜๋Š” JSON ํฌ๋งท์œผ๋กœ , Cluster๋ฅผ ๊ตฌ์„ฑํ•  Host์™€ ์—ญํ• ์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

tf.distribute.Strategy ๋ถ„์‚ฐ ํŒจํ‚ค์ง€

tf.distribute.Strategy ๋Š” ๊ธฐ์กด ML๋ชจ๋ธ ํ•™์Šต ์ฝ”๋“œ๋ฅผ ์กฐ๊ธˆ๋งŒ ์ˆ˜์ •ํ•ด๋„ ๋ถ„์‚ฐ ํ•™์Šต์ด ๊ฐ€๋Šฅํ•˜๋ฉฐ, Multi-GPU ์ธ์ง€, Multi-Node ์ธ์ง€์— ๋”ฐ๋ผ ๋ถ„์‚ฐํ•™์Šต ๋ฐฉ๋ฒ•์ด ๊ตฌ๋ถ„๋œ๋‹ค.

MirroredStrategy ์žฅ๋น„ ํ•˜๋‚˜์—์„œ ๋‹ค์ค‘GPU(Multi-GPU)๋ฅผ ์ด์šฉํ•œ ๋™๊ธฐ์‹ ๋ถ„์‚ฐํ•™์Šต ๋ฐฉ๋ฒ•์ด๋‹ค.

MultiWorkerMirroredStrategy Multi-Worker ๋ฅผ ์ด์šฉํ•œ ๋™๊ธฐ์‹ ๋ถ„์‚ฐํ•™์Šต์œผ๋กœ ๊ฐ Worker๋Š” Multi-GPU๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. Multi-Worker๋“ค ์‚ฌ์ด์—๋Š” All Reduce๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

ParameterServerStrategy Parameter Server ๋ฐฉ์‹์˜ ๋น„๋™๊ธฐ์‹ ๋ถ„์‚ฐํ•™์Šต์„ ์ œ๊ณตํ•œ๋‹ค.

PyTorch Distributed Training

PyTorch๋Š” torch.distributed ํŒจํ‚ค์ง€์—์„œ ๋ถ„์‚ฐํ•™์Šต์„ ์ œ๊ณตํ•˜๋ฉฐ, Parameter Server ๋ถ„์‚ฐ ๋ฐฉ์‹์œผ๋กœ DataParallel๊ณผ All Reduce ๋ถ„์‚ฐํ•™์Šต ๋ฐฉ์‹์œผ๋กœ DistributedDataParallel์„ ์ง€์›ํ•œ๋‹ค.

DataParallel (DP)

DP๋Š” Master-Worker ๊ตฌ์กฐ๋กœ Master๋Š” Cluster Coordinator์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๊ณ  Worker๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šตํ•œ๋‹ค.

Master๋Š” ๋ชจ๋ธ Weight๋ฅผ ๋ณต์ œํ•˜๊ณ  Worker์— Broadcastํ•œ๋‹ค. Master๋Š” ํ•™์Šต๋ฐ์ดํ„ฐ๋ฅผ ์ชผ๊ฐœ์„œ ๊ฐ๊ฐ Worker์— ์ „์†กํ•œ๋‹ค. Worker์—์„œ Gradient ์„ ๊ณ„์‚ฐํ•œ ํ›„, ๊ฐ GPU์—์„œ Local Gradient๋ฅผ ์ˆ˜์ง‘ํ•œ๋‹ค. Master๋Š” ์ˆ˜์ง‘ํ•œ Gradient๋ฅผ ์ง‘๊ณ„ํ•˜๊ณ  ๋ชจ๋ธ ์—…๋ฐ์ดํŠธ๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.

DistributedDataParallel (DDP)

DDP๋Š” All Reduce ๋ฐฉ์‹์œผ๋กœ Worker๋งŒ์œผ๋กœ ๋ถ„์‚ฐํ•™์Šต์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

PyTorch DDP Example

https://pytorch.org/docs/stable/notes/ddp.html

# ํ”„๋กœ์„ธ์Šค๊ฐ€ ์„œ๋กœ ํ†ต์‹ ํ•  ์ˆ˜ ์žˆ๋„๋ก ์ดˆ๊ธฐํ™”๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.
torch.distributed.init_process_group(backend='mpi')

# ์ดˆ๊ธฐํ™”๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด, ๊ฐ ํ”„๋กœ์„ธ์Šค์— GPU ์žฅ์น˜๋ฅผ ๋งคํ•‘ํ•œ๋‹ค.
local_rank = int(os.environ['LOCAL_RANK'])
device = torch.device("cuda:{}".format(local_rank))
torch.cuda.set_device(local_rank)

# ๋ฐ์ดํ„ฐ์…‹์„ ๋ถ„์‚ฐํ•˜๊ธฐ ์œ„ํ•ด DistributedSampler๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
torch.utils.data.distributed.DistributedSampler(trainset)

# ํ•™์Šต ํ˜น์€ ํ‰๊ฐ€์—์„œ ์‚ฌ์šฉํ•  ๋ฐ์ดํ„ฐ๋ฅผ Dataloaders์—์„œ ๊ฐ€์ ธ์˜จ๋‹ค.
train_loader = torch.utils.data.DataLoader(trainset,
                        batch_size=batch_size,
                        shuffle=(train_sample is None),
                        num_workers=workers,
                        pin_memory=False,
                        sampler=train_sampler)
                        
# ๋ถ„์‚ฐํ•™์Šต ๋ฐฉ์‹์„ ์ •์˜ํ•˜๊ณ  ๋ชจ๋ธ์„ GPU ์žฅ์น˜์™€ ๋งคํ•‘ํ•œ๋‹ค.
model = Net().to(device)
Distributor = nn.parallel.DistributedDataParallel
model = Distributor(model)

# GPU ์žฅ์น˜์— ๋ฐ์ดํ„ฐ๋ฅผ ๋งคํ•‘ํ•œ๋‹ค.
for data, target in train_loader:
   data, target = data.to(device), target.to(device)

PyTorch Environment variable

WORLD_SIZE๋Š” ํด๋Ÿฌ์Šคํ„ฐ ์ด ๋…ธ๋“œ ์ˆ˜์ด๊ณ , RANK๋Š” ๊ฐ ๋…ธ๋“œ์˜ ๊ณ ์œ  ์‹๋ณ„์ž์ด๋‹ค. RANK 0 ~WORLD_SIZE โ€” 1๊นŒ์ง€ ์ธ๋ฑ์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ, ๊ฐ ๋…ธ๋“œ์— ์ธ๋ฑ์Šค๋ฅผ ๋ถ€์—ฌํ•œ๋‹ค. RANK๊ฐ€ 0์ด๋ฉด Master์ด๋‹ค. MASTER_ADDR, MASTER_PORT ๋Š” Master ์ •๋ณด๋กœ ๋ชจ๋“  ๋…ธ๋“œ์— ์„ค์ •ํ•œ๋‹ค.

Horovod

Horovod๋Š” Tensorflow, Keras, PyTorch, MXNet ์—์„œ MPI๊ธฐ๋ฐ˜์œผ๋กœ Multi-GPU๋ฅผ ํ™œ์šฉํ•˜์—ฌ Distributed Training์„ ์ง€์›ํ•˜๋Š” ํ”„๋ ˆ์ž„์›Œํฌ์ด๊ณ , Parameter Server ๋ถ„์‚ฐํ•™์Šต ๋ฐฉ์‹์˜ ๋„คํŠธ์›Œํฌ ๋Œ€์—ญํญ ๋ณ‘๋ชฉ ํ˜„์ƒ์„ ๊ฐœ์„ ํ•˜๊ธฐ ์œ„ํ•ด ๊ณ ์•ˆ๋œ All Reduce ๋ถ„์‚ฐํ•™์Šต ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

Horovod๋ฅผ ํ™œ์šฉํ•˜๋ฉด, ๊ธฐ์กด ํ•™์Šต์ฝ”๋“œ์— ์ ์€ ์ฝ”๋“œ๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ์† ์‰ฝ๊ฒŒ Distributed Training์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

Horovod์— ์‚ฌ์šฉํ•˜๋Š” ์šฉ์–ด

Horovod๋ฅผ ์ดํ•ดํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ MPI์™€ ๊ด€๋ จ๋œ ๋ช‡๊ฐ€์ง€ ์šฉ์–ด๋ฅผ ๋จผ์ € ์•Œ์•„์•ผ ํ•œ๋‹ค. MPI (Message Passing Interface)๋Š” ๋ณ‘๋ ฌํ™” ๊ธฐ์ˆ ๋กœ ์—ฌ๋Ÿฌ๋Œ€์˜ CPU/GPU๋ฅผ ๋ณ‘๋ ฌํ™”ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

size ์ „์ฒด ํ”„๋กœ์„ธ์Šค ๊ฐœ์ˆ˜ slots ํ”„๋กœ์„ธ์Šค ๊ฐœ์ˆ˜ (processing unit์œผ๋กœ ๋ณดํ†ต Worker๋‹น Process ๊ฐœ์ˆ˜๋ฅผ ์ •์˜ํ•จ) rank ํด๋Ÿฌ์Šคํ„ฐ์—์„œ 0 ~ size-1 ์‚ฌ์ด์˜ ๊ณ ์œ ํ•œ ํ”„๋กœ์„ธ์Šค ID local_rank ํ•˜๋‚˜์˜ ํ˜ธ์ŠคํŠธ์—์„œ ๊ณ ์œ ํ•œ ํ”„๋กœ์„ธ์Šค ID

๋‹ค์Œ์€ ๊ฐ๊ฐ GPU์žฅ์น˜๊ฐ€ 2๊ฐœ์”ฉ ์žฅ์ฐฉ๋œ ํ˜ธ์ŠคํŠธ 2๋Œ€๋ฅผ ๊ฐ€์ง„ ์‹œ์Šคํ…œ ํ™˜๊ฒฝ์—์„œ GPU ์žฅ์น˜๋ฅผ ์–ด๋–ป๊ฒŒ ๊ตฌ๋ณ„ํ•˜๋Š” ๋ณด์—ฌ์ค€๋‹ค.

Horovod ๊ธฐ๋ฐ˜ ๋ถ„์‚ฐํ•™์Šต ์˜ˆ์ œ

hvd.init() ์‹คํ–‰ํ•œ๋‹ค. hvd.local_rank() ๋ณ„๋กœ ์‚ฌ์šฉํ•  GPU๋ฅผ ์ง€์ •ํ•œ๋‹ค. Learning rate๋ฅผ Worker ๊ฐœ์ˆ˜์— ๋”ฐ๋ผ ์กฐ์ •ํ•œ๋‹ค. ๊ธฐ์กด Optimizer๋ฅผ Horovod Optimzer ๋กœ ํ™•์žฅํ•œ๋‹ค. rank 0 ์ดˆ๊ธฐ ์ƒํƒœ๋ฅผ ๋‹ค๋ฅธ rank์™€๊ณผ ๋™๊ธฐํ™”ํ•˜๊ธฐ ์œ„ hook๋ฅผ ์„ค์ •ํ•œ๋‹ค. rank 0 ๋งŒ ์ฒดํฌํฌ์ธํŠธ ํ•˜๋„๋ก ์„ค์ •ํ•œ๋‹ค.

https://github.com/kubeflow/mpi-operator/blob/master/examples/horovod/tensorflow_mnist.py
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
# ํ˜ธ๋กœ๋ณด๋“œ๋ฅผ ์ดˆ๊ธฐํ™” ํ•œ๋‹ค.
hvd.init()

# ์‚ฌ์šฉํ•  GPU๋ฅผ ๋งคํ•‘ํ•œ๋‹ค.
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())

# ๋ชจ๋ธ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•œ๋‹ค.
loss = ...
opt = tf.train.AdamOptimizer(lr=0.01 * hvd.size())

# ๊ธฐ์กด ๋ชจ๋ธ ์˜ตํ‹ฐ๋งˆ์ด์ €๋ฅผ ํ˜ธ๋กœ๋ณด๋“œ ๋ถ„์‚ฐ ์˜ตํ‹ฐ๋งˆ์ด์ €๋กœ ํ™•์žฅํ•œ๋‹ค.
opt = hvd.DistributedOptimizer(opt)

# rank 0 ์ดˆ๊ธฐ ์ƒํƒœ๋ฅผ ๋‹ค๋ฅธ rank์™€ ๋™๊ธฐํ™”ํ•˜๊ธฐ ์œ„ํ•ด hook๋ฅผ ์„ค์ •ํ•œ๋‹ค.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# rank 0 ๋ฅผ ์ฒดํฌํฌ์ธํŠธ ํ•œ๋‹ค.
ckpt_dir = "/tmp/train_logs" if hvd.rank() == 0 else None

Last updated

Was this helpful?