π ezpz
2025-05-07
ALCF INCITE GPU HACKATHON
May 20β22, 2025
LLMs on Aurora: π ezpz
Sam Foreman
2025-05-07
ezpz
transformers
Megatron-DeepSpeed
ezpz
Write once, run anywhere
Submit interactive job:
Source1 the ezpz/bin/utils.sh
script (using curl
to download it2):
Setup environment:
ezpz_setup_env
ezpz_setup_job
&&
ezpz_setup_python
ezpz_setup_job
: Determine the specifics of our active (PBS, SLURM) job1
ezpz_setup_python
:
conda
envconda
environment
venvs/$(basename ${CONDA_PREFIX})
environmentezpz
integrates directly with the ALCF job scheduler1
${NHOSTS}
, ${NGPU_PER_HOST}
, ${NGPUS}
, β¦)Experiment1 with custom hostfile
(s), e.g.:
torch==2.X
vs torch==2.Y
ezpz
Install1:
Run distributed test:
Launch any python from python
Launch a module:
Launch a python string:
Initializing PyTorch across multiple processes
Automatic device detection (xpu
, cuda
, mps
, cpu
, β¦)
Automatic (single-process) logging
Distributed debugger:
import ezpz
rank = ezpz.setup_torch()
logger = ezpz.get_logger(__name__)
if rank == 0: # -- [1.] --
try:
_ = ezpz.setup_wandb(
"ezpz.examples.minimal"
)
except Exception:
logger.exception(
"Failed to initialize wandb, continuing without it"
)
# ...build {model, optimizer}, etc...
for i in range(train_iters):
metrics = train_step(...)
logger.info( # -- [2.] --
history.update(metrics) # -- [3.] --
)
if rank == 0:
history.finalize()
WANDB_DISABLED
is not set)history.history
with metrics1import os
import time
import ezpz
import torch
logger = ezpz.get_logger(__name__)
class Network(torch.nn.Module):
def __init__(
self,
input_dim: int,
output_dim: int,
sizes: list[int] | None,
):
super(Network, self).__init__()
nh = output_dim if sizes is None else sizes[0]
layers = [torch.nn.Linear(input_dim, nh), torch.nn.ReLU()]
if sizes is not None and len(sizes) > 1:
for idx, size in enumerate(sizes[1:]):
layers.extend(
[torch.nn.Linear(sizes[idx], size), torch.nn.ReLU()]
)
layers.append(torch.nn.Linear(sizes[-1], output_dim))
self.layers = torch.nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.layers(x)
@ezpz.timeitlogit(rank=ezpz.get_rank())
def train(
model: torch.nn.Module, optimizer: torch.optim.Optimizer
) -> ezpz.History:
unwrapped_model = (
model.module
if isinstance(model, torch.nn.parallel.DistributedDataParallel)
else model
)
history = ezpz.History()
device_type = ezpz.get_torch_device_type()
dtype = unwrapped_model.layers[0].weight.dtype
bsize = int(os.environ.get("BATCH_SIZE", 64))
isize = unwrapped_model.layers[0].in_features
warmup = int(os.environ.get("WARMUP_ITERS", 10))
log_freq = int(os.environ.get("LOG_FREQ", 1))
model.train()
for step in range(int(os.environ.get("TRAIN_ITERS", 500))):
with torch.autocast(
device_type=device_type,
dtype=dtype,
):
t0 = time.perf_counter()
x = torch.rand((bsize, isize), dtype=dtype).to(device_type)
y = model(x)
loss = ((y - x) ** 2).sum()
dtf = (t1 := time.perf_counter()) - t0
loss.backward()
optimizer.step()
optimizer.zero_grad()
dtb = time.perf_counter() - t1
if step % log_freq == 0 and step > warmup:
logger.info(
history.update(
{
"iter": step,
"loss": loss.item(),
"dt": dtf + dtb,
"dtf": dtf,
"dtb": dtb,
}
)
)
return history
@ezpz.timeitlogit(rank=ezpz.get_rank())
def setup():
rank = ezpz.setup_torch()
if os.environ.get("WANDB_DISABLED", False):
logger.info("WANDB_DISABLED is set, not initializing wandb")
elif rank == 0:
try:
_ = ezpz.setup_wandb(
project_name=os.environ.get(
"PROJECT_NAME", "ezpz.examples.minimal"
)
)
except Exception:
logger.exception(
"Failed to initialize wandb, continuing without it"
)
device_type = ezpz.get_torch_device_type()
model = Network(
input_dim=int((os.environ.get("INPUT_SIZE", 128))),
output_dim=int(os.environ.get("OUTPUT_SIZE", 128)),
sizes=[
int(x)
for x in os.environ.get("LAYER_SIZES", "1024,512,256,128").split(
","
)
],
)
model.to(device_type)
model.to((os.environ.get("DTYPE", torch.bfloat16)))
logger.info(f"{model=}")
optimizer = torch.optim.Adam(model.parameters())
if ezpz.get_world_size() > 1:
from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(model, device_ids=[ezpz.get_local_rank()])
return model, optimizer
def main():
model, optimizer = setup()
history = train(model, optimizer)
if ezpz.get_rank() == 0:
dataset = history.finalize()
logger.info(f"{dataset=}")
if __name__ == "__main__":
main()
To run the previous example we:
Source the ezpz
utils script:
Setup our environment:
Run the example:
ezpz-test
ezpz-test
is a simple test script that trains a small model using DDP across all available GPUs
mpiexec
command to run the training script across all GPUsSee: ezpz/test.py
Command:
See: ezpz/generate.py
Command:
Command:
ezpz-launch -m ezpz.hf_trainer \
--dataset_name=eliplutchok/fineweb-small-sample \
--streaming \
--model_name_or_path=meta-llama/Llama-3.2-1B \
--bf16=true \
--do_train=true \
--do_eval=true \
--report-to=wandb \
--logging-steps=1 \
--include-tokens-per-second=true \
--block-size=128 \
--max-steps=10 \
--include-num-input-tokens-seen=true \
--auto_find_batch_size=true \
--gradient_checkpointing=true \
--optim=adamw_torch \
--overwrite-output-dir=true \
--logging-first-step \
--include-for-metrics='inputs,loss' \
--max-eval-samples=50 \
--ddp-backend=ccl
This research used resources of the Argonne Leadership Computing Facility, which is a DOE Office of Science User Facility supported under Contract DE-AC02-06CH11357.