使用LoRA进行大模型数据微调二

TwoAdmin 2025-10-24 4 10/24

gpu版

import os
import torch
from datasets import load_dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    DataCollatorForSeq2Seq
)
from peft import (
    LoraConfig,
    get_peft_model,
    TaskType
)
import json

# ==================== 配置部分 ====================
# 模型路径 (FP8量化模型)
MODEL_NAME = "./models/Qwen3-4B-Instruct-2507-FP8"

# 数据集路径
DATA_PATH = "data/train.jsonl"
OUTPUT_DIR = "out/qwen3-finetuned-lora"

# LoRA配置(针对4B模型优化)
LORA_CONFIG = {
    "r": 8,  # ⭐ 降低秩,4B模型不需要太大
    "lora_alpha": 16,  # ⭐ 降低alpha
    "lora_dropout": 0.05,  # ⭐ 降低dropout
    "bias": "none",
    "target_modules": ["q_proj", "v_proj"],  # ⭐ 减少目标模块,节省内存
    "task_type": TaskType.CAUSAL_LM
}

# 训练参数(针对macOS和4B模型优化)
TRAINING_ARGS = {
    "num_train_epochs": 3,
    "per_device_train_batch_size": 1,  # ⭐ macOS内存有限,用1
    "gradient_accumulation_steps": 8,  # ⭐ 增加累积步数
    "learning_rate": 1e-4,  # ⭐ 降低学习率
    "fp16": torch.cuda.is_available(),  # ⭐ 只在有CUDA时启用
    "bf16": False,  # macOS M系列可尝试True
    "optim": "adamw_torch",  # ⭐ 改为普通优化器
    "logging_steps": 10,
    "save_steps": 50,
    "output_dir": OUTPUT_DIR,
    "save_total_limit": 2,
    "report_to": "tensorboard",
    "remove_unused_columns": False,
    "dataloader_pin_memory": False,  # ⭐ macOS设为False
    "gradient_checkpointing": True,  # ⭐ 开启梯度检查点节省内存
}


# ==================== 数据预处理 ====================
def format_conversation(example):
    """将数据转换为模型输入的对话格式"""
    if "messages" in example:
        messages = example["messages"]
        if not messages or messages[0]["role"] != "system":
            messages = [{"role": "system", "content": "你是一个有帮助的AI助手。"}] + messages

        text = ""
        for msg in messages:
            if msg["role"] == "system":
                text += f"<|im_start|>system\n{msg['content']}<|im_end|>\n"
            elif msg["role"] == "user":
                text += f"<|im_start|>user\n{msg['content']}<|im_end|>\n"
            elif msg["role"] == "assistant":
                text += f"<|im_start|>assistant\n{msg['content']}<|im_end|>\n"

        return {"text": text}

    elif "instruction" in example:
        instruction = example.get("instruction", "")
        input_text = example.get("input", "")
        output = example.get("output", "")

        text = f"<|im_start|>system\n你是一个有帮助的AI助手。<|im_end|>\n"
        if input_text:
            text += f"<|im_start|>user\n{instruction}\n{input_text}<|im_end|>\n"
        else:
            text += f"<|im_start|>user\n{instruction}<|im_end|>\n"
        text += f"<|im_start|>assistant\n{output}<|im_end|>"

        return {"text": text}

    else:
        text = example.get("text", "")
        label = example.get("label", "")
        return {"text": f"文本: {text}\n分类: {label}"}


# ==================== 主微调函数 ====================
def finetune_qwen():
    print("=" * 50)
    print("开始微调 Qwen3 FP8模型")
    print(f"模型: {MODEL_NAME}")
    print(f"数据: {DATA_PATH}")
    print("=" * 50)

    # 0. 检查模型路径
    if not os.path.exists(MODEL_NAME):
        print(f"错误: 模型路径不存在: {MODEL_NAME}")
        print("请确认模型路径是否正确")
        return None, None

    # 1. 加载tokenizer
    print("加载tokenizer...")
    try:
        tokenizer = AutoTokenizer.from_pretrained(
            MODEL_NAME,
            trust_remote_code=True,
            padding_side="right"
        )

        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        print("Tokenizer加载成功")

    except Exception as e:
        print(f"Tokenizer加载失败: {e}")
        return None, None

    # 2. 加载数据集
    print("加载和预处理数据集...")
    try:
        dataset = load_dataset("json", data_files=DATA_PATH, split="train")

        dataset = dataset.map(
            format_conversation,
            remove_columns=dataset.column_names,
            desc="格式化数据"
        )

        dataset = dataset.train_test_split(test_size=0.2, seed=42)
        train_dataset = dataset["train"]
        eval_dataset = dataset["test"]

        print(f"数据加载成功 | 训练集: {len(train_dataset)} | 验证集: {len(eval_dataset)}")

    except Exception as e:
        print(f"数据加载失败: {e}")
        return None, None

    # 3. 加载模型(关键修改:FP8模型不需要quantization_config)
    print("加载模型(FP8量化)...")
    try:
        # 重要:移除 quantization_config 参数 
        model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            device_map="auto",  # 自动分配到可用设备
            trust_remote_code=True,
            torch_dtype=torch.float16,  # 保持FP16精度
            # 不要添加 quantization_config 参数 
        )
        print("模型加载成功")

    except Exception as e:
        print(f"模型加载失败: {e}")
        print("\n尝试替代方案...")

        # 尝试替代加载方式
        try:
            model = AutoModelForCausalLM.from_pretrained(
                MODEL_NAME,
                trust_remote_code=True,
                torch_dtype=torch.float16
            )
            if torch.cuda.is_available():
                model = model.cuda()
            else:
                model = model.cpu()
            print("模型加载成功(替代方案)")
        except Exception as e2:
            print(f"所有加载方式都失败: {e2}")
            return None, None

    # 4. 应用LoRA配置(不需要prepare_model_for_kbit_training)
    print("应用LoRA配置...")
    try:
        lora_config = LoraConfig(**LORA_CONFIG)
        model = get_peft_model(model, lora_config)

        # 打印可训练参数
        model.print_trainable_parameters()
        print("LoRA配置应用成功")

    except Exception as e:
        print(f"LoRA配置失败: {e}")
        return None, None

    # 5. 数据整理器
    def tokenize_function(examples):
        return tokenizer(
            examples["text"],
            truncation=True,
            padding="max_length",
            max_length=256,  # 降低长度节省内存
            return_tensors=None
        )

    print("分词处理...")
    try:
        tokenized_train = train_dataset.map(
            tokenize_function,
            batched=True,
            remove_columns=["text"],
            desc="分词训练集"
        )

        tokenized_eval = eval_dataset.map(
            tokenize_function,
            batched=True,
            remove_columns=["text"],
            desc="分词验证集"
        )
        print("分词处理完成")

    except Exception as e:
        print(f"分词处理失败: {e}")
        return None, None

    # 6. 设置训练参数
    training_args = TrainingArguments(**TRAINING_ARGS)

    # 7. 创建Trainer并开始训练
    try:
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=tokenized_train,
            eval_dataset=tokenized_eval,
            tokenizer=tokenizer,
            data_collator=DataCollatorForSeq2Seq(
                tokenizer=tokenizer,
                padding=True
            ),
        )

        print("开始训练...")
        train_result = trainer.train()

        # 8. 保存模型
        print("保存模型...")
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        model.save_pretrained(OUTPUT_DIR)
        tokenizer.save_pretrained(OUTPUT_DIR)

        # 保存训练指标
        metrics = train_result.metrics
        trainer.save_metrics("train", metrics)

        print(f"训练完成!模型已保存到: {OUTPUT_DIR}")

        return model, tokenizer

    except Exception as e:
        print(f"训练失败: {e}")
        return None, None


# ==================== 推理测试 ====================
def test_model(model_path, prompt):
    """测试微调后的模型"""
    from peft import PeftModel

    try:
        # 加载基础模型
        base_model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            trust_remote_code=True,
            torch_dtype=torch.float16
        )

        # 加载LoRA权重
        model = PeftModel.from_pretrained(base_model, model_path)
        tokenizer = AutoTokenizer.from_pretrained(model_path)

        # 设置设备
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model = model.to(device)

        # 生成回复
        inputs = tokenizer(prompt, return_tensors="pt").to(device)

        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=200,
                temperature=0.7,
                do_sample=True,
                top_p=0.9
            )

        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        return response

    except Exception as e:
        return f"测试失败: {e}"


# ==================== 主程序 ====================
if __name__ == "__main__":
    # 检查环境
    print(f"PyTorch版本: {torch.__version__}")
    print(f"CUDA可用: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"GPU: {torch.cuda.get_device_name(0)}")

    # 执行微调
    model, tokenizer = finetune_qwen()

    if model and tokenizer:
        # 简单测试
        test_prompt = "<|im_start|>user\n介绍一下你自己<|im_end|>\n<|im_start|>assistant\n"
        print("\n测试微调后的模型:")
        print(test_model(OUTPUT_DIR, test_prompt))
    else:
        print("\n微调失败,请检查以上错误信息")
- THE END -
Tag:

TwoAdmin

5月19日16:34

最后修改:2026年5月19日
0

非特殊说明,本博所有文章均为博主原创。