123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- #!/usr/bin/env python3
- """
- DragonOS文档自动翻译工具
- Usage:
- 在DragonOS源码根目录下运行此脚本。
- 需要先进入docs目录,执行命令安装依赖包。
- pip install -r requirements.txt
- 接着先声明以下变量:
- export OPENAI_API_KEY=your_api_key
- export OPENAI_MODEL=your_model_name (推荐qwen3的4b以上的)
- export OPENAI_BASE_URL=your_openai_base_url
- export MAX_WORKERS=your_max_workers (推荐2-20)
- 然后运行:
- python3 tools/doc_translator.py
- """
- import os
- import re
- import hashlib
- import json
- from pathlib import Path
- import threading
- from typing import List, Dict, Tuple
- import openai
- import datetime
- import time
- from tqdm import tqdm
- # 配置
- def get_env_var(name, required=False, default=None):
- """从环境变量获取配置"""
- value = os.getenv(name, default)
- if required and not value:
- raise ValueError(f"环境变量 {name} 未设置")
- return value
- CONFIG = {
- "source_dir": "docs", # 源文档目录
- "target_languages": {
- "en": "English",
- },
- "dirs_exclude": ["_build", "locales"], # 排除的目录
- "model": get_env_var("OPENAI_MODEL", default="qwen3:4b"), # 模型名称
- # API地址
- "base_url": get_env_var("OPENAI_BASE_URL", default="http://localhost:11434/v1"),
- "chunk_size": 1000, # 分块大小(tokens)
- "cache_file": "docs/.translation_cache.json", # 翻译缓存文件
- "max_workers": int(get_env_var("MAX_WORKERS", default="1")), # 并行工作数
- # 元数据模板
- "meta_templates": {
- ".rst": (
- ".. note:: AI Translation Notice\n\n"
- " This document was automatically translated by `{model}` model, for reference only.\n\n"
- " - Source document: {original_path}\n\n"
- " - Translation time: {timestamp}\n\n"
- " - Translation model: `{model}`\n\n"
- "\n Please report issues via `Community Channel <https://github.com/DragonOS-Community/DragonOS/issues>`_\n\n"
- ),
- ".md": (
- ":::{note}\n"
- "**AI Translation Notice**\n\n"
- "This document was automatically translated by `{model}` model, for reference only.\n\n"
- "- Source document: {original_path}\n\n"
- "- Translation time: {timestamp}\n\n"
- "- Translation model: `{model}`\n\n"
- "Please report issues via [Community Channel](https://github.com/DragonOS-Community/DragonOS/issues)\n\n"
- ":::\n\n"
- )
- }
- }
- class LabelManager:
- """管理文档标签和引用"""
- def __init__(self, lang: str):
- self.label_map = {}
- self.prefix = "_translated_label_"
- self.lang = lang
- def register_label(self, original_label: str) -> str:
- """注册新标签并返回映射后的标签"""
- if original_label not in self.label_map:
- new_label = f"{self.prefix}_{original_label}_{self.lang}"
- self.label_map[original_label] = new_label
- return self.label_map[original_label]
- def get_all_labels(self) -> Dict[str, str]:
- """获取所有标签映射"""
- return self.label_map
- class DocumentTranslator:
- def __init__(self):
- self._cache_lock = threading.Lock()
- self._cache = self._load_cache()
- self.fail_count = 0
- try:
- self.client = openai.OpenAI(
- base_url=CONFIG["base_url"],
- # 这是故意把key的获取写在这里的。防止哪个二货直接print CONFIG导致key泄露。
- api_key=get_env_var("OPENAI_API_KEY", default="ollama"),
- )
- except Exception as e:
- raise RuntimeError(f"OpenAI客户端初始化失败: {str(e)}")
- def _load_cache(self) -> Dict:
- """加载翻译缓存"""
- if os.path.exists(CONFIG["cache_file"]):
- with open(CONFIG["cache_file"], "r", encoding="utf-8") as f:
- try:
- return json.load(f)
- except json.JSONDecodeError:
- pass
- return {}
- def _save_cache(self):
- """保存翻译缓存"""
- with self._cache_lock:
- with open(CONFIG["cache_file"], "w", encoding="utf-8") as f:
- json.dump(self._cache, f, ensure_ascii=False, indent=2)
- def _get_cache_key(self, filepath: str, lang: str) -> str:
- """生成缓存键(包含语言代码)"""
- rel_path = os.path.relpath(filepath, CONFIG["source_dir"])
- return f"{lang}:{rel_path}"
- def _split_into_chunks(self, text: str) -> List[str]:
- """将文本分块"""
- # 按段落分割
- paragraphs = re.split(r"\n\s*\n", text)
- chunks = []
- current_chunk = []
- current_size = 0
- for para in paragraphs:
- para_size = len(para.split())
- if current_size + para_size > CONFIG["chunk_size"] and current_chunk:
- chunks.append("\n\n".join(current_chunk))
- current_chunk = []
- current_size = 0
- current_chunk.append(para)
- current_size += para_size
- if current_chunk:
- chunks.append("\n\n".join(current_chunk))
- return chunks
- def _process_rst_labels(self, text: str, label_manager: LabelManager) -> str:
- """处理reStructuredText标签"""
- def replace_label(match):
- original_label = match.group(1)
- new_label = label_manager.register_label(original_label)
- return f'.. {new_label}:'
- # 处理标签定义
- text = re.sub(r'\.\.\s+_([^:]+):', replace_label, text)
- # 处理标签引用
- text = re.sub(r'(?<!\w)`([^`]+)`(?!\w)',
- lambda m: f'`{label_manager.register_label(m.group(1))}`',
- text)
- return text
- def _process_md_labels(self, text: str, label_manager: LabelManager) -> str:
- """处理Markdown标签"""
- # 处理显式标签定义 {#label}
- text = re.sub(r'\{#([^}]+)\}',
- lambda m: f'{{#{label_manager.register_label(m.group(1))}}}',
- text)
- # 处理显式标签定义 (label)=
- text = re.sub(r'\(([^)]+)\)=',
- lambda m: f'({label_manager.register_label(m.group(1))})=',
- text)
- # 处理标签引用 [text](#label)
- text = re.sub(r'\[([^\]]+)\]\(#([^)]+)\)',
- lambda m: f'[{m.group(1)}](#{label_manager.register_label(m.group(2))})',
- text)
- # 处理裸标签引用 #label
- text = re.sub(r'(?<!\w)#([\w-]+)(?!\w)',
- lambda m: f'#{label_manager.register_label(m.group(1))}',
- text)
- return text
- def _generate_unique_label_for_lang(self, text: str, lang: str) -> str:
- # 处理标签
- label_manager = LabelManager(lang)
- text = self._process_rst_labels(text, label_manager)
- text = self._process_md_labels(text, label_manager)
- return text
- def _preserve_special_format(self, text: str) -> Tuple[str, Dict]:
- """保留特殊格式"""
- preserved = {}
- # 排除不需要翻译的块
- exclude_blocks = re.findall(
- r'\.\. Note: __EXCLUDE_IN_TRANSLATED_START.*?\.\. Note: __EXCLUDE_IN_TRANSLATED_END',
- text, re.DOTALL)
- for block in exclude_blocks:
- text = text.replace(block, '')
- # 处理多行代码块
- code_blocks = re.findall(r"```.*?\n.*?```", text, re.DOTALL)
- for i, block in enumerate(code_blocks):
- placeholder = f"__CODE_BLOCK_{i}__"
- preserved[placeholder] = block
- text = text.replace(block, placeholder)
- # 处理内联代码块
- inline_code = re.findall(r"`[^`]+`", text)
- for i, code in enumerate(inline_code):
- placeholder = f"__INLINE_CODE_{i}__"
- preserved[placeholder] = code
- text = text.replace(code, placeholder)
- return text, preserved
- def _restore_special_format(self, text: str, preserved: Dict) -> str:
- """恢复特殊格式"""
- # 先恢复内联代码块
- for placeholder, content in preserved.items():
- if placeholder.startswith("__INLINE_CODE_"):
- text = text.replace(placeholder, content)
- # 然后恢复多行代码块
- for placeholder, content in preserved.items():
- if placeholder.startswith("__CODE_BLOCK_"):
- text = text.replace(placeholder, content)
- return text
- def _remove_thinking(self, text: str) -> str:
- """Remove <think> tags from text"""
- return re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
- def _translate_chunk(self, args: Tuple[str, str]) -> str:
- """翻译单个文本块(内部方法,用于并行处理)"""
- chunk, lang = args
- retry = 3
- while retry > 0:
- try:
- lang_name = CONFIG["target_languages"].get(lang, "English")
- prompt = f"你是一个专业的文档翻译助手,请将以下中文技术文档准确翻译成{lang_name},保持技术术语的正确性和格式不变。"
- # disable qwen3's thinking mode
- if "qwen3" in CONFIG["model"].lower():
- prompt += "\n/no_think\n"
- chunk += "\n/no_think\n"
- response = self.client.chat.completions.create(
- extra_body={"enable_thinking": False},
- model=CONFIG["model"],
- messages=[
- {"role": "system", "content": prompt},
- {"role": "user", "content": chunk}
- ],
- temperature=0.3,
- )
- content = response.choices[0].message.content
- return self._remove_thinking(content)
- except Exception as e:
- retry -= 1
- if retry == 0:
- print("翻译失败: {e},放弃重试。")
- return None
- print(f"翻译出错: {e}, retrying... ({retry})")
- time.sleep(2)
- def translate_text(self, text: str, lang: str) -> str:
- """使用openai接口翻译文本
- Args:
- text: 要翻译的文本
- lang: 目标语言代码
- """
- chunks = self._split_into_chunks(text)
- translated_chunks = []
- for chunk in chunks:
- translated_chunk = self._translate_chunk((chunk, lang))
- if translated_chunk:
- translated_chunks.append(translated_chunk)
- return "\n\n".join(translated_chunks)
- def process_file(self, filepath: str, lang: str = "en"):
- """处理单个文件
- Args:
- filepath: 源文件路径
- lang: 目标语言代码 (默认'en')
- """
- rel_path = os.path.relpath(filepath, CONFIG["source_dir"])
- target_path = os.path.join(
- CONFIG["source_dir"], "locales", lang, rel_path)
- # 检查文件是否已存在且未修改
- cache_key = self._get_cache_key(filepath, lang)
- file_hash = hashlib.md5(open(filepath, "rb").read()).hexdigest()
- target_file_exists = os.path.exists(target_path)
- with self._cache_lock:
- if cache_key in self._cache and self._cache[cache_key]["hash"] == file_hash and target_file_exists:
- print(f"文件未修改,跳过: {rel_path} (语言: {lang})")
- return
- print(f"正在处理: {rel_path}")
- # 读取文件内容
- with open(filepath, "r", encoding="utf-8") as f:
- content = f.read()
- # 保留特殊格式
- content, preserved = self._preserve_special_format(content)
- content = self._generate_unique_label_for_lang(content, lang)
- # 分块翻译
- translated_content = self.translate_text(
- content, lang)
- if not translated_content:
- print(f"翻译失败!{filepath}")
- self.fail_count += 1
- return
- # 恢复特殊格式
- translated_content = self._restore_special_format(
- translated_content, preserved)
- # 创建目标目录
- os.makedirs(os.path.dirname(target_path), exist_ok=True)
- # 写入翻译结果
- with open(target_path, "w", encoding="utf-8") as f:
- # 添加翻译元数据
- file_ext = os.path.splitext(filepath)[1]
- template = CONFIG["meta_templates"].get(file_ext, "")
- if template:
- timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- original_path = os.path.relpath(filepath, CONFIG["source_dir"])
- meta_content = template.format(
- note="{note}",
- model=CONFIG["model"],
- timestamp=timestamp,
- original_name=os.path.basename(filepath),
- original_path=original_path
- )
- translated_content = meta_content + translated_content
- f.write(translated_content)
- f.write("\n")
- # 更新缓存
- with self._cache_lock:
- self._cache[cache_key] = {
- "hash": file_hash,
- }
- self._save_cache()
- print(f"文件 {rel_path} 已成功翻译为 {lang} 并保存到 {target_path}")
- def add_language_root_title(self, lang):
- """为每个语言的文档添加标题"""
- lang_root_doc_path = os.path.join(
- CONFIG["source_dir"], "locales", lang, "index.rst")
- if not os.path.exists(lang_root_doc_path):
- raise FileNotFoundError(f"未找到 {lang} 的标题文件: {lang_root_doc_path}")
- print(f"正在为 {CONFIG['target_languages'][lang]} 添加标题...")
- # Read existing content first
- with open(lang_root_doc_path, "r", encoding="utf-8") as f:
- content = f.read()
- lang_v = CONFIG["target_languages"][lang]
- if content.startswith(lang_v):
- print(f"{lang_v} 的标题已存在,跳过...")
- return
- # Then write new content (this clears the file)
- with open(lang_root_doc_path, "w", encoding="utf-8") as f:
- f.write(
- f"{lang_v}\n==========================================\n{content}")
- print(f"标题已添加到 {lang_root_doc_path}")
- def run(self):
- """运行翻译流程"""
- print("Collecting all files...")
- all_files = []
- for root, dirs, files in os.walk(CONFIG["source_dir"], topdown=True):
- # 只在根目录应用排除逻辑
- if root == CONFIG["source_dir"]:
- dirs[:] = [d for d in dirs if d not in CONFIG["dirs_exclude"]]
- for file in files:
- if file.endswith((".rst", ".md")):
- all_files.append(os.path.join(root, file))
- total_files = len(all_files)
- print(
- f"Total {total_files} files to translate in {len(CONFIG['target_languages'])} languages.")
- total_tasks = total_files * len(CONFIG["target_languages"])
- # 外层进度条:语言
- lang_pbar = tqdm(CONFIG["target_languages"].items(),
- desc="Overall progress",
- unit="lang",
- position=0)
- for lang_k, lang_v in lang_pbar:
- lang_pbar.set_description(f"Translating to {lang_v}")
- # 并行处理文件
- from concurrent.futures import ThreadPoolExecutor, as_completed
- # 包装处理函数便于调试之类的
- def process_file_wrapper(file_path):
- self.process_file(file_path, lang_k)
- return file_path
- # 创建线程池
- with ThreadPoolExecutor(max_workers=CONFIG["max_workers"]) as executor:
- # 提交所有文件处理任务
- futures = [executor.submit(
- process_file_wrapper, path) for path in all_files]
- # 创建进度条
- file_pbar = tqdm(total=len(all_files),
- desc=f"Files in {lang_v}",
- unit="file",
- position=1,
- leave=False)
- # 更新进度条
- for future in as_completed(futures):
- file_pbar.update(1)
- future.result() # 获取结果(如果有异常会在这里抛出)
- file_pbar.close()
- self.add_language_root_title(lang_k)
- lang_pbar.close()
- print(
- f"\n翻译完成! Succ: {total_tasks-self.fail_count}, Fail: {self.fail_count}")
- if __name__ == "__main__":
- print("Starting translation process...")
- print("WORKERS: ", CONFIG["max_workers"])
- print("LANGUAGES: ", CONFIG["target_languages"])
- print("SOURCE_DIR: ", CONFIG["source_dir"])
- print("MODEL: ", CONFIG["model"])
- translator = DocumentTranslator()
- translator.run()
|