# -*- coding: UTF-8 -*- import time import requests import json from retry import retry import xlwt from os import path from sys import stdout from concurrent.futures import ThreadPoolExecutor __all__ = ["get_dict", "get_json"] function_list = ["get_cnt", "get_pr", "get_contributors"] # 信息获取函数 PATH = "" # 文件输出路径以及配置文件存储路径,为空则默认在脚本文件同一目录下` head1 = ["name", "starred", "watching", "fork", "issue", "pull_request", "contributor"] # 表头 head2 = ["name", "contributions"] # 配置文件读取 try: with open(path.join(PATH, "config.json"), "r", encoding="utf-8") as f: # 配置文件选项说明 dic = json.loads(f.read()) USER = dic["user"] # 目标用户 TOKEN = dic["token"] # github访问令牌,用于增加api访问次数 PARALLEL = dic["parallel_threads"] # 最并行线程数 BLACKLIST = dic["black_list"] # contributor获取的仓库黑名单 WHITELIST = dic["white_list"] # 仓库黑名单中的contributor白名单 pool = ThreadPoolExecutor(max_workers=PARALLEL) except Exception as e: print("There are some errors while getting configure information!\n") raise e @retry(Exception, 5, 2, 8) def get_info(url): """ :param url:请求的api链接 :return: py字典 """ headers = {"Authorization": "Bearer " + TOKEN} response = requests.get(url=url, headers=headers).text return json.loads(response) def get_repo(repo_dict): """ :param repo_dict:仓库字典 :return: py字典 """ result = {"name": str(repo_dict.get("name")), "description": repo_dict.get("description")} for fuc in function_list: result.update(eval("%s(repo_dict)" % (fuc))) return result def get_cnt(repo_dict): result = { "starred": repo_dict.get("stargazers_count"), "watching": repo_dict.get("watchers_count"), "fork": repo_dict.get("forks_count"), "issue": repo_dict.get("open_issues_count"), } return result def get_pr(repo_dict): pr_dict = get_info(r"https://api.github.com/repos/" + repo_dict["full_name"] + "/pulls") return {"pull_request": len(pr_dict)} def get_contributors(repo_dict): result = {"contributor_list": []} contri_dict = get_info(repo_dict["contributors_url"]) for dic in contri_dict: # 黑白名单实现 if repo_dict["name"] in BLACKLIST or repo_dict.get("parent"): if dic["login"] not in WHITELIST: continue tmp = { "name": dic["login"], "id": dic["id"], "contributions": dic["contributions"] } result["contributor_list"].append(tmp) result["contributor"] = len(result["contributor_list"]) return result def sum_up(dic): contribute_existed = {} result = {"total": { "starred": 0, "watching": 0, "fork": 0, "issue": 0, "pull_request": 0, "contributor": 0, "contributor_list": [] }} pos = 0 for repo in dic["repositories"]: for k in result["total"].keys(): if k != "contributor_list": result["total"][k] += repo[k] else: # contributor累加 for contribute in repo[k]: if contribute_existed.get(contribute["name"]) is None: result["total"][k].append(contribute.copy()) contribute_existed[contribute["name"]] = pos pos += 1 else: result["total"][k][contribute_existed[contribute["name"]]]["contributions"] += \ contribute["contributions"] result["total"]["contributor_list"].sort(key=lambda a: a["contributions"], reverse=True) result["total"]["contributor"] = len(contribute_existed) dic.update(result) return dic def get_dict(): """ :return:带有信息的py字典 """ # 获取用户信息 info_dict = {"repositories": []} root_dict = get_info(r"https://api.github.com/users/" + USER + r"/repos") # 解析信息 def thread(dic): result = get_repo(dic) info_dict["repositories"].append(result) return 1 # 分别获取每个仓库 thread_list = [] wrong_list = [] for dic in root_dict: thread_list.append(pool.submit(thread, dic)) time.sleep(0.05) # 等待线程完毕 while thread_list: for x in thread_list: if x.done() and x.result(): thread_list.remove(x) elif x.done() and not x.result(): wrong_list.append(x.exception()) thread_list.remove(x) stdout.write('\r %d threads left. . .' % (len(thread_list))) # 输出线程完成情况 stdout.write('\r Done!During the process,%d exceptions have been raised. . . ' % (len(wrong_list))) stdout.flush() if len(wrong_list): for i in wrong_list: stdout.write(str(i) + "\n") stdout.flush() # 按名字字母排序 info_dict["repositories"].sort(key=lambda a: a["name"].lower()) return sum_up(info_dict) def get_json(dic=None): """ :return:带有信息的json文本 """ if not dic: return json.dumps(get_dict(), sort_keys=False, indent=4, separators=(',', ':'), ensure_ascii=False) else: return json.dumps(dic, sort_keys=False, indent=4, separators=(',', ':'), ensure_ascii=False) def wt_json(text): if PATH: with open(path.join(PATH, "github_info.json"), "w", encoding="utf-8") as f: f.write(text) f.flush() else: with open(path.join(PATH, "github_info.json"), "w", encoding="utf-8") as f: f.write(text) f.flush() def wt_excel(dic): wb = xlwt.Workbook() # try: # 写入仓库数据 tb1 = wb.add_sheet("repositories", cell_overwrite_ok=True) for i in range(len(head1)): tb1.write(0, i, head1[i]) for i in range(len(dic["repositories"])): for j in range(len(head1)): tb1.write(i + 1, j, dic["repositories"][i][head1[j]]) # 写入总计数据 for i in range(len(head1)): if head1[i] == "name": tb1.write(len(dic["repositories"]) + 1, i, "Total") continue # if type(dic["total"][head1[i]]) == ("dict" or "list"): # tb1.write(len(dic["repositories"]) + 2, i, len(dic["total"][head1[i]])) # else: tb1.write(len(dic["repositories"]) + 1, i, dic["total"][head1[i]]) # 写入贡献者名单 tb2 = wb.add_sheet("contributor list", cell_overwrite_ok=True) for i in range(len(head2)): tb2.write(0, i, head2[i]) for i in range(len(dic["total"]["contributor_list"])): for j in range(len(head2)): tb2.write(i + 1, j, dic["total"]["contributor_list"][i][head2[j]]) # except Exception as e: # print("\n") # print(e) wb.save(path.join(PATH, "statistics.xls")) if __name__ == '__main__': dic = get_dict() wt_json(get_json(dic)) wt_excel(dic)