使用LoRA进行大模型数据微调一

TwoAdmin 2025-10-23 4 10/23

cpu版

import os
import torch
import time
import json
from datasets import load_dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType

print("=" * 70)
print("Qwen3 CPU微调 - 最终版")
print("=" * 70)

# 配置
MODEL_NAME = "./models/Qwen3-0.6B"
DATA_PATH = "data/train.jsonl"
OUTPUT_DIR = "out/qwen3-0.6B-finetuned-final"

print(f"模型: {MODEL_NAME}")
print(f"数据: {DATA_PATH}")
print(f"输出: {OUTPUT_DIR}")


# ==================== 数据格式化 ====================
def format_for_qwen(example):
    """为Qwen模型格式化数据"""
    if "messages" in example:
        messages = example["messages"]

        # 确保有system消息
        if not messages or messages[0]["role"] != "system":
            messages = [{"role": "system", "content": "你是一个分析师。"}] + messages

        # 构建Qwen格式对话
        formatted = ""
        for msg in messages:
            formatted += f"<|im_start|>{msg['role']}\n{msg['content']}<|im_end|>\n"

        return {"text": formatted}

    return {"text": ""}


# ==================== 修复的JSON序列化函数 ====================
def save_lora_config_safe(config, path):
    """安全保存LoRA配置(处理set类型)"""
    config_dict = config.to_dict()

    # 修复set类型不能序列化的问题
    for key, value in config_dict.items():
        if isinstance(value, set):
            config_dict[key] = list(value)  # 转换为list
        elif isinstance(value, (list, tuple)):
            # 检查list/tuple中是否有set
            new_list = []
            for item in value:
                if isinstance(item, set):
                    new_list.append(list(item))
                else:
                    new_list.append(item)
            config_dict[key] = new_list

    with open(path, "w") as f:
        json.dump(config_dict, f, indent=2)


# ==================== 主程序 ====================
def main():
    try:
        # 1. 加载tokenizer
        print(f"\n🔧 步骤1: 加载tokenizer...")
        start_time = time.time()

        tokenizer = AutoTokenizer.from_pretrained(
            MODEL_NAME,
            trust_remote_code=True,
            padding_side="left"
        )

        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token

        print(f" 完成 ({time.time() - start_time:.1f}s)")

        # 2. 加载数据
        print(f"\n🔧 步骤2: 加载数据集...")
        start_time = time.time()

        dataset = load_dataset("json", data_files=DATA_PATH, split="train")
        dataset = dataset.map(format_for_qwen, remove_columns=dataset.column_names)

        # 分割数据集
        if len(dataset) > 4:
            split_data = dataset.train_test_split(test_size=0.2, seed=42)
            train_dataset = split_data["train"]
            eval_dataset = split_data["test"]
        else:
            train_dataset = dataset
            eval_dataset = dataset

        print(f"  完成 ({time.time() - start_time:.1f}s)")
        print(f"  训练集: {len(train_dataset)} 条")
        print(f"  验证集: {len(eval_dataset)} 条")

        # 3. 加载模型
        print(f"\n🔧 步骤3: 加载模型...")
        start_time = time.time()

        model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            trust_remote_code=True,
            dtype=torch.float32,
        )

        model = model.to("cpu")
        model.config.use_cache = False

        total_params = sum(p.numel() for p in model.parameters())
        print(f"  完成 ({time.time() - start_time:.1f}s)")
        print(f"  模型参数量: {total_params:,}")

        # 4. 应用LoRA
        print(f"\n🔧 步骤4: 配置LoRA...")
        start_time = time.time()

        lora_config = LoraConfig(
            r=4,
            lora_alpha=8,
            lora_dropout=0.05,
            bias="none",
            target_modules=["q_proj", "v_proj"],
            task_type=TaskType.CAUSAL_LM,
        )

        model = get_peft_model(model, lora_config)

        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        print(f"  完成 ({time.time() - start_time:.1f}s)")
        print(f"  可训练参数: {trainable_params:,} ({trainable_params / total_params * 100:.2f}%)")

        # 5. 数据分词
        print(f"\n🔧 步骤5: 数据分词...")
        start_time = time.time()

        def tokenize_function(examples):
            tokenized = tokenizer(
                examples["text"],
                truncation=True,
                padding="max_length",
                max_length=128,
                return_tensors=None
            )
            tokenized["labels"] = tokenized["input_ids"].copy()
            return tokenized

        tokenized_train = train_dataset.map(
            tokenize_function,
            batched=True,
            batch_size=10,
            remove_columns=["text"]
        )

        tokenized_eval = eval_dataset.map(
            tokenize_function,
            batched=True,
            batch_size=10,
            remove_columns=["text"]
        )

        print(f"  完成 ({time.time() - start_time:.1f}s)")

        # 6. 训练配置
        print(f"\n步骤6: 配置训练...")

        training_args = TrainingArguments(
            output_dir=OUTPUT_DIR,
            num_train_epochs=1,
            per_device_train_batch_size=1,
            per_device_eval_batch_size=1,
            gradient_accumulation_steps=2,
            learning_rate=1e-4,
            fp16=False,
            logging_steps=1,
            save_steps=5,
            eval_steps=3,
            eval_strategy="steps",
            save_strategy="steps",
            save_total_limit=1,
            remove_unused_columns=True,
            dataloader_num_workers=0,
            use_cpu=True,
            load_best_model_at_end=False,
            report_to="none",
            disable_tqdm=False,
            optim="adamw_torch",
        )

        # 7. 创建训练器
        data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=tokenized_train,
            eval_dataset=tokenized_eval,
            processing_class=tokenizer,
            data_collator=data_collator,
        )

        # 8. 开始训练
        print("\n" + "=" * 70)
        print("开始训练...")
        print("=" * 70)

        train_start = time.time()
        train_result = trainer.train()
        train_time = time.time() - train_start

        print(f"\n训练成功!")
        print(f"   总耗时: {train_time:.1f}秒")
        print(f"   最终loss: {train_result.training_loss:.4f}")

        # 9. 保存模型(修复版)
        print(f"\n保存模型...")
        os.makedirs(OUTPUT_DIR, exist_ok=True)

        # 保存模型权重
        model.save_pretrained(OUTPUT_DIR)
        tokenizer.save_pretrained(OUTPUT_DIR)

        # 安全保存配置
        config_save_path = os.path.join(OUTPUT_DIR, "lora_config.json")
        save_lora_config_safe(lora_config, config_save_path)

        # 保存训练参数
        args_save_path = os.path.join(OUTPUT_DIR, "training_args.json")
        with open(args_save_path, "w") as f:
            json.dump(training_args.to_dict(), f, indent=2)

        print(f"模型已保存到: {OUTPUT_DIR}")

        # 10. 测试生成
        print("\n测试微调效果...")

        # 加载微调后的模型
        print("加载微调后的模型进行测试...")
        from peft import PeftModel

        # 加载基础模型
        base_model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            trust_remote_code=True,
            dtype=torch.float32,
        )

        # 加载LoRA权重
        tuned_model = PeftModel.from_pretrained(base_model, OUTPUT_DIR)
        tuned_model = tuned_model.to("cpu")
        tuned_model.eval()

        # 测试问题
        test_questions = [
            "个人养老金怎么参加?",
            "社保断缴有什么影响?",
            "失业金领取条件是什么?"
        ]

        for question in test_questions:
            # 构建提示
            prompt = f"""<|im_start|>system
你是一个分析师,擅长解答社保、就业、劳动关系等问题。<|im_end|>
<|im_start|>user
{question}<|im_end|>
<|im_start|>assistant
"""

            print(f"\n{'=' * 50}")
            print(f"问题: {question}")
            print(f"{'=' * 50}")

            inputs = tokenizer(prompt, return_tensors="pt")

            with torch.no_grad():
                outputs = tuned_model.generate(
                    **inputs,
                    max_new_tokens=150,
                    temperature=0.7,
                    do_sample=True,
                    top_p=0.9,
                    pad_token_id=tokenizer.pad_token_id,
                    eos_token_id=tokenizer.eos_token_id,
                    repetition_penalty=1.1,
                )

            response = tokenizer.decode(outputs[0], skip_special_tokens=True)

            # 提取assistant的回答
            if "<|im_start|>assistant" in response:
                answer = response.split("<|im_start|>assistant")[-1]
                answer = answer.replace("<|im_end|>", "").strip()
                print(f"回答: {answer}")
            else:
                print(f"回答: {response[:200]}...")

        print("\n" + "=" * 70)
        print("微调成功完成!")
        print("=" * 70)

        return True

    except Exception as e:
        print(f"\n错误: {e}")
        import traceback
        traceback.print_exc()
        return False


if __name__ == "__main__":
    success = main()

    if success:
        print("\n恭喜!Qwen3模型微调成功!")
        print("\n后续步骤:")
        print("1. 使用微调后的模型:")
        print(f"   from peft import PeftModel")
        print(f"   model = PeftModel.from_pretrained(base_model, '{OUTPUT_DIR}')")
        print()
        print("2. 部署为API服务:")
        print(f"   参考之前的FastAPI代码,修改模型路径为: '{OUTPUT_DIR}'")
        print()
        print("3. 继续训练(如果需要):")
        print(f"   修改num_train_epochs参数,重新运行本脚本")
    else:
        print("\n微调失败,请检查错误信息")
- THE END -

TwoAdmin

5月19日16:35

最后修改:2026年5月19日
0

非特殊说明,本博所有文章均为博主原创。