Skip to content

Instantly share code, notes, and snippets.

@pacman100
Created May 2, 2024 10:53

Revisions

  1. pacman100 created this gist May 2, 2024.
    198 changes: 198 additions & 0 deletions debug_text_gen_dpo.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,198 @@
    """
    1. First checkout the trl branch:
    git clone https://github.com/huggingface/trl.git
    git checkout debug-dpo
    2. Install deps with:
    make dev
    Then install latest versions of transformers / accelerate / deepspeed
    pip install transformers==4.39.1 accelerate==0.28.0 deepspeed==0.14.0
    See examples/scripts/requirements.txt for exact versions.
    3. Run with:
    TRANSFORMERS_VERBOSITY=info ACCELERATE_LOG_LEVEL=info accelerate launch --config_file=examples/accelerate_configs/deepspeed_zero3.yaml examples/scripts/debug_text_gen_dpo.py
    If you change `gradient_accumulation_steps=1` in the `TrainingArguments` and `examples/accelerate_configs/deepspeed_zero3.yaml` config it runs fine. But with `gradient_accumulation_steps=2` it fails with the following error:
    Traceback (most recent call last):
    File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 141, in <module>
    main()
    File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 137, in main
    trainer.train()
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 1624, in train
    return inner_training_loop(
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 1961, in _inner_training_loop
    tr_loss_step = self.training_step(model, inputs)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 2902, in training_step
    loss = self.compute_loss(model, inputs)
    File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 43, in compute_loss
    with unwrap_model_for_generation(model, self.accelerator) as unwrapped_model:
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/contextlib.py", line 142, in __exit__
    next(self.gen)
    File "/fsx/lewis/git/hf/trl/trl/models/utils.py", line 146, in unwrap_model_for_generation
    with deepspeed.zero.GatheredParameters(model.parameters()):
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 2177, in __exit__
    self.params[0].partition(param_list=self.params, has_been_updated=False)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1325, in partition
    self._partition(param_list, has_been_updated=has_been_updated)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1474, in _partition
    self._partition_param(param, has_been_updated=has_been_updated)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn
    ret_val = func(*args, **kwargs)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1507, in _partition_param
    free_param(param)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn
    ret_val = func(*args, **kwargs)
    File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 279, in free_param
    assert not param.ds_active_sub_modules, param.ds_summary()
    AssertionError: {'id': 0, 'status': 'AVAILABLE', 'numel': 25755648, 'ds_numel': 25755648, 'shape': (50304, 512), 'ds_shape': (50304, 512), 'requires_grad': True, 'grad_shape': None, 'persist': False, 'active_sub_modules': {182}, 'ds_tensor.shape': torch.Size([3219456])}
    """
    import warnings
    from contextlib import nullcontext
    from typing import Any, Dict, Tuple, Union

    import torch
    import torch.nn as nn
    from datasets import Dataset
    from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    PreTrainedModel,
    TrainingArguments,
    )

    from trl import DPOTrainer
    from trl.models.utils import unwrap_model_for_generation


    class MyDPOTrainer(DPOTrainer):
    def compute_loss(
    self,
    model: Union[PreTrainedModel, nn.Module],
    inputs: Dict[str, Union[torch.Tensor, Any]],
    return_outputs=False,
    ) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict[str, torch.Tensor]]]:
    if not self.use_dpo_data_collator:
    warnings.warn(
    "compute_loss is only implemented for DPODataCollatorWithPadding, and you passed a datacollator that is different than "
    "DPODataCollatorWithPadding - you might see unexpected behavior. Alternatively, you can implement your own prediction_step method if you are using a custom data collator"
    )

    with unwrap_model_for_generation(model, self.accelerator) as unwrapped_model:
    import time
    start_time = time.time()
    # prefix = self.tokenizer(["<|user|> "]*inputs["prompt_input_ids"].shape[0], return_tensors="pt").input_ids.cuda()
    # suffix = self.tokenizer([" <|assistant|>"]*inputs["prompt_input_ids"].shape[0], return_tensors="pt").input_ids.cuda()
    # print(f"{prefix.shape=} {suffix.shape=} {inputs['prompt_input_ids'].shape=}")
    # torch.hstack((prefix, inputs["prompt_input_ids"], suffix)
    generations = unwrapped_model.generate(inputs["prompt_input_ids"], max_new_tokens=30,
    do_sample=True,
    temperature=0.2,
    top_k=50,
    top_p=0.95,
    repetition_penalty=1.2,
    eos_token_id=self.tokenizer.eos_token_id)
    print(self.tokenizer.batch_decode(generations))
    generation_time = torch.tensor([time.time() - start_time]).to(self.accelerator.device)

    Gather all gen_time and compute mean
    generation_time_gather = self.accelerator.gather(generation_time)
    print(f"{self.accelerator.process_index=} Win rate generation time: {generation_time_gather.mean().item():.2f}")
    if self.accelerator.is_main_process:
    print(
    f"Win rate generation time: {generation_time_gather.mean().item():.2f} seconds for {len(generations)} generations"
    )

    compute_loss_context_manager = torch.cuda.amp.autocast if self._peft_has_been_casted_to_bf16 else nullcontext

    with compute_loss_context_manager():
    loss, metrics = self.get_batch_loss_metrics(model, inputs, train_eval="train")

    # Make sure to move the loss to the device the original accumulating loss is at back in the `Trainer` class:
    loss = loss.to(self.args.device)
    # force log the metrics
    self.store_metrics(metrics, train_eval="train")
    print(f"{loss=}")

    if return_outputs:
    return (loss, metrics)
    return loss


    def main():
    training_args = TrainingArguments(
    output_dir="scratch/dummy-model",
    per_device_train_batch_size=2,
    max_steps=50,
    remove_unused_columns=False,
    gradient_accumulation_steps=2, # Runs fine with gradient_accumulation_steps=1
    learning_rate=5e-5,
    evaluation_strategy="steps",
    bf16=True,
    )

    # fmt: off
    dummy_dataset_dict = {
    "prompt": [
    "<|user|> hello, nice to meet you.<|endoftext|> <|assistant|> ",
    "<|user|> how are you<|endoftext|> <|assistant|> ",
    "<|user|> What is your name?<|endoftext|> <|assistant|> ",
    "<|user|> What is your name?<|endoftext|> <|assistant|> ",
    "<|user|> Which is the best programming language?<|endoftext|> <|assistant|> ",
    "<|user|> Which is the best programming language?<|endoftext|> <|assistant|> ",
    "<|user|> How is the stock price?<|endoftext|> <|assistant|> ",
    "<|user|> How is the stock price?<|endoftext|> <|assistant|> ",
    ],
    "chosen": [
    "hi nice to meet you<|endoftext|>",
    "I am fine<|endoftext|>",
    "My name is Mary<|endoftext|>",
    "My name is Mary<|endoftext|>",
    "Python<|endoftext|>",
    "Python<|endoftext|>",
    "$46 as of 10am EST<|endoftext|>",
    "46 as of 10am EST<|endoftext|>",
    ],
    "rejected": [
    "leave me alone<|endoftext|>",
    "I am not fine<|endoftext|>",
    "Whats it to you?<|endoftext|>",
    "I dont have a name<|endoftext|>",
    "Javascript<|endoftext|>",
    "C++<|endoftext|>",
    "what stock price?<|endoftext|>",
    "I don't understand what you mean by \"stock price\"<|endoftext|>",
    ],
    }
    dummy_dataset = Dataset.from_dict(dummy_dataset_dict)

    model_id = "HuggingFaceH4/pythia-70m-sft"
    model_revision = "v0.0"
    model = AutoModelForCausalLM.from_pretrained(model_id, revision=model_revision)
    ref_model = AutoModelForCausalLM.from_pretrained(model_id, revision=model_revision)
    tokenizer = AutoTokenizer.from_pretrained(model_id, revision=model_revision)
    tokenizer.pad_token_id = 1

    trainer = MyDPOTrainer(
    model=model,
    ref_model=ref_model,
    beta=0.1,
    loss_type="sigmoid",
    args=training_args,
    tokenizer=tokenizer,
    train_dataset=dummy_dataset,
    eval_dataset=dummy_dataset,
    precompute_ref_log_probs=False,
    )

    trainer.train()


    if __name__ == "__main__":
    main()
    159 changes: 159 additions & 0 deletions utils.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,159 @@
    from contextlib import contextmanager
    from dataclasses import dataclass
    from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union

    from accelerate.utils import is_deepspeed_available
    from transformers import PreTrainedModel, PreTrainedTokenizer

    from .modeling_value_head import AutoModelForCausalLMWithValueHead, AutoModelForSeq2SeqLMWithValueHead
    import itertools

    SUPPORTED_ARCHITECTURES = (
    AutoModelForCausalLMWithValueHead,
    AutoModelForSeq2SeqLMWithValueHead,
    )


    if is_deepspeed_available():
    import deepspeed

    if TYPE_CHECKING:
    from accelerate import Accelerator
    from deepspeed.runtime.engine import DeepSpeedEngine
    from torch.nn.parallel.distributed import DistributedDataParallel

    from .modeling_base import PreTrainedModelWrapper


    # TODO: Add Abstract Base Class if more formats are added
    @dataclass
    class ChatMlSpecialTokens:
    """Dataclass for special tokens used in ChatML, including system, user, assistant, bos, eos, and pad tokens."""

    bos_token: str = "<|im_start|>"
    eos_token: str = "<|im_end|>"
    pad_token: str = "<|im_end|>"

    @property
    def system(self):
    return f"{self.bos_token}system"

    @property
    def user(self):
    return f"{self.bos_token}user"

    @property
    def assistant(self):
    return f"{self.bos_token}assistant"

    @property
    def chat_template(self):
    return (
    "{% for message in messages %}"
    f"{{{{'{self.bos_token}' + message['role'] + '\n' + message['content'] + '{self.eos_token}' + '\n'}}}}"
    "{% endfor %}"
    "{% if add_generation_prompt %}"
    f"{{{{ '{self.assistant}\n' }}}}"
    "{% endif %}"
    )


    FORMAT_MAPPING = {"chatml": ChatMlSpecialTokens}


    def setup_chat_format(
    model: PreTrainedModel,
    tokenizer: PreTrainedTokenizer,
    format: Optional[Literal["chatml"]] = "chatml",
    resize_to_multiple_of: Optional[int] = None,
    ) -> Tuple[PreTrainedModel, PreTrainedTokenizer]:
    """
    Setup chat format by adding special tokens to the tokenizer, setting the correct format, and extending the embedding layer of the model based on the new special tokens.
    Args:
    model (`~transformers.PreTrainedModel`): The model to be modified.
    tokenizer (`~transformers.PreTrainedTokenizer`): The tokenizer to be modified.
    format (`Optional[Literal["chatml"]]`): The format to be set. Defaults to "chatml".
    resize_to_multiple_of (`Optional[int]`): Number to resize the embedding layer to. Defaults to None.
    Returns:
    model (`~transformers.PreTrainedModel`): The modified model.
    tokenizer (`~transformers.PreTrainedTokenizer`): The modified tokenizer.
    """
    # check if format available and retrieve
    if format not in FORMAT_MAPPING:
    raise ValueError(f"Format {format} not available. Please use one of {FORMAT_MAPPING.keys()}")

    chat_format = FORMAT_MAPPING[format]()

    # set special tokens and them
    tokenizer.eos_token = chat_format.eos_token
    tokenizer.pad_token = chat_format.pad_token
    tokenizer.bos_token = chat_format.bos_token
    tokenizer.add_special_tokens({"additional_special_tokens": [chat_format.bos_token, chat_format.eos_token]})
    # set chat format for tokenizer
    tokenizer.chat_template = chat_format.chat_template

    # resize embedding layer to a multiple of 64, https://x.com/karpathy/status/1621578354024677377
    model.resize_token_embeddings(
    len(tokenizer), pad_to_multiple_of=resize_to_multiple_of if resize_to_multiple_of is not None else None
    )
    # Make sure to update the generation config to use the new eos & bos token
    if getattr(model, "generation_config", None) is not None:
    model.generation_config.bos_token_id = tokenizer.bos_token_id
    model.generation_config.eos_token_id = tokenizer.eos_token_id
    model.generation_config.pad_token_id = tokenizer.pad_token_id

    return model, tokenizer


    def remove_hooks(model: "DeepSpeedEngine") -> None:
    """Removes the optimizer hooks from a DeepSpeed ZeRO-3 model."""
    if model.optimizer is not None and hasattr(model.optimizer, "parameter_offload"):
    optimizer_offload = model.optimizer.parameter_offload
    elif model.optimizer is not None:
    optimizer_offload = model.optimizer

    for param in iter_params(optimizer_offload.module, recurse=True):
    param.ds_active_sub_modules.clear()

    for hook in optimizer_offload.forward_hooks:
    hook.remove()
    for hook in optimizer_offload.backward_hooks:
    hook.remove()

    optimizer_offload.forward_hooks = []
    optimizer_offload.backward_hooks = []

    def get_all_parameters(sub_module, recurse=False):
    return itertools.chain(sub_module.named_parameters(recurse=recurse), sub_module.ds_external_parameters())

    def iter_params(module, recurse=False):
    return map(lambda pair: pair[1], get_all_parameters(module, recurse))

    def add_hooks(model: "DeepSpeedEngine") -> None:
    """Adds the optimizer hooks from a DeepSpeed ZeRO-3 model."""
    if model.optimizer is not None and hasattr(model.optimizer, "parameter_offload"):
    optimizer_offload = model.optimizer.parameter_offload
    elif model.optimizer is not None:
    optimizer_offload = model.optimizer
    optimizer_offload._register_hooks_recursively(optimizer_offload.module)


    @contextmanager
    def unwrap_model_for_generation(
    model: Union["DistributedDataParallel", "DeepSpeedEngine"], accelerator: "Accelerator", is_peft_model: bool = False
    ) -> Union["PreTrainedModelWrapper", "DeepSpeedEngine"]:
    """Context manager to unwrap a model for generation.
    For ZeRO-3 models, we gather the weights once to speed up generation.
    """
    unwrapped_model = accelerator.unwrap_model(model)
    if is_peft_model:
    unwrapped_model.pretrained_model.disable_adapter()
    if accelerator.state.deepspeed_plugin is not None and accelerator.state.deepspeed_plugin.zero_stage == 3:
    with deepspeed.zero.GatheredParameters(model.parameters()):
    remove_hooks(model)
    yield model
    add_hooks(model)
    else:
    yield unwrapped_model