doc_translator.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. #!/usr/bin/env python3
  2. """
  3. DragonOS文档自动翻译工具
  4. Usage:
  5. 在DragonOS源码根目录下运行此脚本。
  6. 需要先进入docs目录,执行命令安装依赖包。
  7. pip install -r requirements.txt
  8. 接着先声明以下变量:
  9. export OPENAI_API_KEY=your_api_key
  10. export OPENAI_MODEL=your_model_name (推荐qwen3的4b以上的)
  11. export OPENAI_BASE_URL=your_openai_base_url
  12. export MAX_WORKERS=your_max_workers (推荐2-20)
  13. 然后运行:
  14. python3 tools/doc_translator.py
  15. """
  16. import os
  17. import re
  18. import hashlib
  19. import json
  20. from pathlib import Path
  21. import threading
  22. from typing import List, Dict, Tuple
  23. import openai
  24. import datetime
  25. import time
  26. from tqdm import tqdm
  27. # 配置
  28. def get_env_var(name, required=False, default=None):
  29. """从环境变量获取配置"""
  30. value = os.getenv(name, default)
  31. if required and not value:
  32. raise ValueError(f"环境变量 {name} 未设置")
  33. return value
  34. CONFIG = {
  35. "source_dir": "docs", # 源文档目录
  36. "target_languages": {
  37. "en": "English",
  38. },
  39. "dirs_exclude": ["_build", "locales"], # 排除的目录
  40. "model": get_env_var("OPENAI_MODEL", default="qwen3:4b"), # 模型名称
  41. # API地址
  42. "base_url": get_env_var("OPENAI_BASE_URL", default="http://localhost:11434/v1"),
  43. "chunk_size": 1000, # 分块大小(tokens)
  44. "cache_file": "docs/.translation_cache.json", # 翻译缓存文件
  45. "max_workers": int(get_env_var("MAX_WORKERS", default="1")), # 并行工作数
  46. # 元数据模板
  47. "meta_templates": {
  48. ".rst": (
  49. ".. note:: AI Translation Notice\n\n"
  50. " This document was automatically translated by `{model}` model, for reference only.\n\n"
  51. " - Source document: {original_path}\n\n"
  52. " - Translation time: {timestamp}\n\n"
  53. " - Translation model: `{model}`\n\n"
  54. "\n Please report issues via `Community Channel <https://github.com/DragonOS-Community/DragonOS/issues>`_\n\n"
  55. ),
  56. ".md": (
  57. ":::{note}\n"
  58. "**AI Translation Notice**\n\n"
  59. "This document was automatically translated by `{model}` model, for reference only.\n\n"
  60. "- Source document: {original_path}\n\n"
  61. "- Translation time: {timestamp}\n\n"
  62. "- Translation model: `{model}`\n\n"
  63. "Please report issues via [Community Channel](https://github.com/DragonOS-Community/DragonOS/issues)\n\n"
  64. ":::\n\n"
  65. )
  66. }
  67. }
  68. class LabelManager:
  69. """管理文档标签和引用"""
  70. def __init__(self, lang: str):
  71. self.label_map = {}
  72. self.prefix = "_translated_label_"
  73. self.lang = lang
  74. def register_label(self, original_label: str) -> str:
  75. """注册新标签并返回映射后的标签"""
  76. if original_label not in self.label_map:
  77. new_label = f"{self.prefix}_{original_label}_{self.lang}"
  78. self.label_map[original_label] = new_label
  79. return self.label_map[original_label]
  80. def get_all_labels(self) -> Dict[str, str]:
  81. """获取所有标签映射"""
  82. return self.label_map
  83. class DocumentTranslator:
  84. def __init__(self):
  85. self._cache_lock = threading.Lock()
  86. self._cache = self._load_cache()
  87. self.fail_count = 0
  88. try:
  89. self.client = openai.OpenAI(
  90. base_url=CONFIG["base_url"],
  91. # 这是故意把key的获取写在这里的。防止哪个二货直接print CONFIG导致key泄露。
  92. api_key=get_env_var("OPENAI_API_KEY", default="ollama"),
  93. )
  94. except Exception as e:
  95. raise RuntimeError(f"OpenAI客户端初始化失败: {str(e)}")
  96. def _load_cache(self) -> Dict:
  97. """加载翻译缓存"""
  98. if os.path.exists(CONFIG["cache_file"]):
  99. with open(CONFIG["cache_file"], "r", encoding="utf-8") as f:
  100. try:
  101. return json.load(f)
  102. except json.JSONDecodeError:
  103. pass
  104. return {}
  105. def _save_cache(self):
  106. """保存翻译缓存"""
  107. with self._cache_lock:
  108. with open(CONFIG["cache_file"], "w", encoding="utf-8") as f:
  109. json.dump(self._cache, f, ensure_ascii=False, indent=2)
  110. def _get_cache_key(self, filepath: str, lang: str) -> str:
  111. """生成缓存键(包含语言代码)"""
  112. rel_path = os.path.relpath(filepath, CONFIG["source_dir"])
  113. return f"{lang}:{rel_path}"
  114. def _split_into_chunks(self, text: str) -> List[str]:
  115. """将文本分块"""
  116. # 按段落分割
  117. paragraphs = re.split(r"\n\s*\n", text)
  118. chunks = []
  119. current_chunk = []
  120. current_size = 0
  121. for para in paragraphs:
  122. para_size = len(para.split())
  123. if current_size + para_size > CONFIG["chunk_size"] and current_chunk:
  124. chunks.append("\n\n".join(current_chunk))
  125. current_chunk = []
  126. current_size = 0
  127. current_chunk.append(para)
  128. current_size += para_size
  129. if current_chunk:
  130. chunks.append("\n\n".join(current_chunk))
  131. return chunks
  132. def _process_rst_labels(self, text: str, label_manager: LabelManager) -> str:
  133. """处理reStructuredText标签"""
  134. def replace_label(match):
  135. original_label = match.group(1)
  136. new_label = label_manager.register_label(original_label)
  137. return f'.. {new_label}:'
  138. # 处理标签定义
  139. text = re.sub(r'\.\.\s+_([^:]+):', replace_label, text)
  140. # 处理标签引用
  141. text = re.sub(r'(?<!\w)`([^`]+)`(?!\w)',
  142. lambda m: f'`{label_manager.register_label(m.group(1))}`',
  143. text)
  144. return text
  145. def _process_md_labels(self, text: str, label_manager: LabelManager) -> str:
  146. """处理Markdown标签"""
  147. # 处理显式标签定义 {#label}
  148. text = re.sub(r'\{#([^}]+)\}',
  149. lambda m: f'{{#{label_manager.register_label(m.group(1))}}}',
  150. text)
  151. # 处理显式标签定义 (label)=
  152. text = re.sub(r'\(([^)]+)\)=',
  153. lambda m: f'({label_manager.register_label(m.group(1))})=',
  154. text)
  155. # 处理标签引用 [text](#label)
  156. text = re.sub(r'\[([^\]]+)\]\(#([^)]+)\)',
  157. lambda m: f'[{m.group(1)}](#{label_manager.register_label(m.group(2))})',
  158. text)
  159. # 处理裸标签引用 #label
  160. text = re.sub(r'(?<!\w)#([\w-]+)(?!\w)',
  161. lambda m: f'#{label_manager.register_label(m.group(1))}',
  162. text)
  163. return text
  164. def _generate_unique_label_for_lang(self, text: str, lang: str) -> str:
  165. # 处理标签
  166. label_manager = LabelManager(lang)
  167. text = self._process_rst_labels(text, label_manager)
  168. text = self._process_md_labels(text, label_manager)
  169. return text
  170. def _preserve_special_format(self, text: str) -> Tuple[str, Dict]:
  171. """保留特殊格式"""
  172. preserved = {}
  173. # 排除不需要翻译的块
  174. exclude_blocks = re.findall(
  175. r'\.\. Note: __EXCLUDE_IN_TRANSLATED_START.*?\.\. Note: __EXCLUDE_IN_TRANSLATED_END',
  176. text, re.DOTALL)
  177. for block in exclude_blocks:
  178. text = text.replace(block, '')
  179. # 处理多行代码块
  180. code_blocks = re.findall(r"```.*?\n.*?```", text, re.DOTALL)
  181. for i, block in enumerate(code_blocks):
  182. placeholder = f"__CODE_BLOCK_{i}__"
  183. preserved[placeholder] = block
  184. text = text.replace(block, placeholder)
  185. # 处理内联代码块
  186. inline_code = re.findall(r"`[^`]+`", text)
  187. for i, code in enumerate(inline_code):
  188. placeholder = f"__INLINE_CODE_{i}__"
  189. preserved[placeholder] = code
  190. text = text.replace(code, placeholder)
  191. return text, preserved
  192. def _restore_special_format(self, text: str, preserved: Dict) -> str:
  193. """恢复特殊格式"""
  194. # 先恢复内联代码块
  195. for placeholder, content in preserved.items():
  196. if placeholder.startswith("__INLINE_CODE_"):
  197. text = text.replace(placeholder, content)
  198. # 然后恢复多行代码块
  199. for placeholder, content in preserved.items():
  200. if placeholder.startswith("__CODE_BLOCK_"):
  201. text = text.replace(placeholder, content)
  202. return text
  203. def _remove_thinking(self, text: str) -> str:
  204. """Remove <think> tags from text"""
  205. return re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
  206. def _translate_chunk(self, args: Tuple[str, str]) -> str:
  207. """翻译单个文本块(内部方法,用于并行处理)"""
  208. chunk, lang = args
  209. retry = 3
  210. while retry > 0:
  211. try:
  212. lang_name = CONFIG["target_languages"].get(lang, "English")
  213. prompt = f"你是一个专业的文档翻译助手,请将以下中文技术文档准确翻译成{lang_name},保持技术术语的正确性和格式不变。"
  214. # disable qwen3's thinking mode
  215. if "qwen3" in CONFIG["model"].lower():
  216. prompt += "\n/no_think\n"
  217. chunk += "\n/no_think\n"
  218. response = self.client.chat.completions.create(
  219. extra_body={"enable_thinking": False},
  220. model=CONFIG["model"],
  221. messages=[
  222. {"role": "system", "content": prompt},
  223. {"role": "user", "content": chunk}
  224. ],
  225. temperature=0.3,
  226. )
  227. content = response.choices[0].message.content
  228. return self._remove_thinking(content)
  229. except Exception as e:
  230. retry -= 1
  231. if retry == 0:
  232. print("翻译失败: {e},放弃重试。")
  233. return None
  234. print(f"翻译出错: {e}, retrying... ({retry})")
  235. time.sleep(2)
  236. def translate_text(self, text: str, lang: str) -> str:
  237. """使用openai接口翻译文本
  238. Args:
  239. text: 要翻译的文本
  240. lang: 目标语言代码
  241. """
  242. chunks = self._split_into_chunks(text)
  243. translated_chunks = []
  244. for chunk in chunks:
  245. translated_chunk = self._translate_chunk((chunk, lang))
  246. if translated_chunk:
  247. translated_chunks.append(translated_chunk)
  248. return "\n\n".join(translated_chunks)
  249. def process_file(self, filepath: str, lang: str = "en"):
  250. """处理单个文件
  251. Args:
  252. filepath: 源文件路径
  253. lang: 目标语言代码 (默认'en')
  254. """
  255. rel_path = os.path.relpath(filepath, CONFIG["source_dir"])
  256. target_path = os.path.join(
  257. CONFIG["source_dir"], "locales", lang, rel_path)
  258. # 检查文件是否已存在且未修改
  259. cache_key = self._get_cache_key(filepath, lang)
  260. file_hash = hashlib.md5(open(filepath, "rb").read()).hexdigest()
  261. target_file_exists = os.path.exists(target_path)
  262. with self._cache_lock:
  263. if cache_key in self._cache and self._cache[cache_key]["hash"] == file_hash and target_file_exists:
  264. print(f"文件未修改,跳过: {rel_path} (语言: {lang})")
  265. return
  266. print(f"正在处理: {rel_path}")
  267. # 读取文件内容
  268. with open(filepath, "r", encoding="utf-8") as f:
  269. content = f.read()
  270. # 保留特殊格式
  271. content, preserved = self._preserve_special_format(content)
  272. content = self._generate_unique_label_for_lang(content, lang)
  273. # 分块翻译
  274. translated_content = self.translate_text(
  275. content, lang)
  276. if not translated_content:
  277. print(f"翻译失败!{filepath}")
  278. self.fail_count += 1
  279. return
  280. # 恢复特殊格式
  281. translated_content = self._restore_special_format(
  282. translated_content, preserved)
  283. # 创建目标目录
  284. os.makedirs(os.path.dirname(target_path), exist_ok=True)
  285. # 写入翻译结果
  286. with open(target_path, "w", encoding="utf-8") as f:
  287. # 添加翻译元数据
  288. file_ext = os.path.splitext(filepath)[1]
  289. template = CONFIG["meta_templates"].get(file_ext, "")
  290. if template:
  291. timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  292. original_path = os.path.relpath(filepath, CONFIG["source_dir"])
  293. meta_content = template.format(
  294. note="{note}",
  295. model=CONFIG["model"],
  296. timestamp=timestamp,
  297. original_name=os.path.basename(filepath),
  298. original_path=original_path
  299. )
  300. translated_content = meta_content + translated_content
  301. f.write(translated_content)
  302. f.write("\n")
  303. # 更新缓存
  304. with self._cache_lock:
  305. self._cache[cache_key] = {
  306. "hash": file_hash,
  307. }
  308. self._save_cache()
  309. print(f"文件 {rel_path} 已成功翻译为 {lang} 并保存到 {target_path}")
  310. def add_language_root_title(self, lang):
  311. """为每个语言的文档添加标题"""
  312. lang_root_doc_path = os.path.join(
  313. CONFIG["source_dir"], "locales", lang, "index.rst")
  314. if not os.path.exists(lang_root_doc_path):
  315. raise FileNotFoundError(f"未找到 {lang} 的标题文件: {lang_root_doc_path}")
  316. print(f"正在为 {CONFIG['target_languages'][lang]} 添加标题...")
  317. # Read existing content first
  318. with open(lang_root_doc_path, "r", encoding="utf-8") as f:
  319. content = f.read()
  320. lang_v = CONFIG["target_languages"][lang]
  321. if content.startswith(lang_v):
  322. print(f"{lang_v} 的标题已存在,跳过...")
  323. return
  324. # Then write new content (this clears the file)
  325. with open(lang_root_doc_path, "w", encoding="utf-8") as f:
  326. f.write(
  327. f"{lang_v}\n==========================================\n{content}")
  328. print(f"标题已添加到 {lang_root_doc_path}")
  329. def run(self):
  330. """运行翻译流程"""
  331. print("Collecting all files...")
  332. all_files = []
  333. for root, dirs, files in os.walk(CONFIG["source_dir"], topdown=True):
  334. # 只在根目录应用排除逻辑
  335. if root == CONFIG["source_dir"]:
  336. dirs[:] = [d for d in dirs if d not in CONFIG["dirs_exclude"]]
  337. for file in files:
  338. if file.endswith((".rst", ".md")):
  339. all_files.append(os.path.join(root, file))
  340. total_files = len(all_files)
  341. print(
  342. f"Total {total_files} files to translate in {len(CONFIG['target_languages'])} languages.")
  343. total_tasks = total_files * len(CONFIG["target_languages"])
  344. # 外层进度条:语言
  345. lang_pbar = tqdm(CONFIG["target_languages"].items(),
  346. desc="Overall progress",
  347. unit="lang",
  348. position=0)
  349. for lang_k, lang_v in lang_pbar:
  350. lang_pbar.set_description(f"Translating to {lang_v}")
  351. # 并行处理文件
  352. from concurrent.futures import ThreadPoolExecutor, as_completed
  353. # 包装处理函数便于调试之类的
  354. def process_file_wrapper(file_path):
  355. self.process_file(file_path, lang_k)
  356. return file_path
  357. # 创建线程池
  358. with ThreadPoolExecutor(max_workers=CONFIG["max_workers"]) as executor:
  359. # 提交所有文件处理任务
  360. futures = [executor.submit(
  361. process_file_wrapper, path) for path in all_files]
  362. # 创建进度条
  363. file_pbar = tqdm(total=len(all_files),
  364. desc=f"Files in {lang_v}",
  365. unit="file",
  366. position=1,
  367. leave=False)
  368. # 更新进度条
  369. for future in as_completed(futures):
  370. file_pbar.update(1)
  371. future.result() # 获取结果(如果有异常会在这里抛出)
  372. file_pbar.close()
  373. self.add_language_root_title(lang_k)
  374. lang_pbar.close()
  375. print(
  376. f"\n翻译完成! Succ: {total_tasks-self.fail_count}, Fail: {self.fail_count}")
  377. if __name__ == "__main__":
  378. print("Starting translation process...")
  379. print("WORKERS: ", CONFIG["max_workers"])
  380. print("LANGUAGES: ", CONFIG["target_languages"])
  381. print("SOURCE_DIR: ", CONFIG["source_dir"])
  382. print("MODEL: ", CONFIG["model"])
  383. translator = DocumentTranslator()
  384. translator.run()