git.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # -*- coding: utf-8 -*-
  2. import collections
  3. import datetime
  4. import logging
  5. import os
  6. import re
  7. import subprocess
  8. import tarfile
  9. import tempfile
  10. GitRef = collections.namedtuple(
  11. "VersionRef",
  12. ["name", "commit", "source", "is_remote", "refname", "creatordate",],
  13. )
  14. logger = logging.getLogger(__name__)
  15. def get_all_refs(gitroot):
  16. cmd = (
  17. "git",
  18. "for-each-ref",
  19. "--format",
  20. "%(objectname)\t%(refname)\t%(creatordate:iso)",
  21. "refs",
  22. )
  23. output = subprocess.check_output(cmd, cwd=gitroot).decode()
  24. for line in output.splitlines():
  25. is_remote = False
  26. fields = line.strip().split("\t")
  27. if len(fields) != 3:
  28. continue
  29. commit = fields[0]
  30. refname = fields[1]
  31. creatordate = datetime.datetime.strptime(
  32. fields[2], "%Y-%m-%d %H:%M:%S %z"
  33. )
  34. # Parse refname
  35. matchobj = re.match(
  36. r"^refs/(heads|tags|remotes/[^/]+)/(\S+)$", refname
  37. )
  38. if not matchobj:
  39. continue
  40. source = matchobj.group(1)
  41. name = matchobj.group(2)
  42. if source.startswith("remotes/"):
  43. is_remote = True
  44. yield GitRef(name, commit, source, is_remote, refname, creatordate)
  45. def get_refs(
  46. gitroot, tag_whitelist, branch_whitelist, remote_whitelist, files=()
  47. ):
  48. for ref in get_all_refs(gitroot):
  49. if ref.source == "tags":
  50. if tag_whitelist is None or not re.match(tag_whitelist, ref.name):
  51. logger.debug(
  52. "Skipping '%s' because tag '%s' doesn't match the "
  53. "whitelist pattern",
  54. ref.refname,
  55. ref.name,
  56. )
  57. continue
  58. elif ref.source == "heads":
  59. if branch_whitelist is None or not re.match(
  60. branch_whitelist, ref.name
  61. ):
  62. logger.debug(
  63. "Skipping '%s' because branch '%s' doesn't match the "
  64. "whitelist pattern",
  65. ref.refname,
  66. ref.name,
  67. )
  68. continue
  69. elif ref.is_remote and remote_whitelist is not None:
  70. remote_name = ref.source.partition("/")[2]
  71. if not re.match(remote_whitelist, remote_name):
  72. logger.debug(
  73. "Skipping '%s' because remote '%s' doesn't match the "
  74. "whitelist pattern",
  75. ref.refname,
  76. remote_name,
  77. )
  78. continue
  79. if branch_whitelist is None or not re.match(
  80. branch_whitelist, ref.name
  81. ):
  82. logger.debug(
  83. "Skipping '%s' because branch '%s' doesn't match the "
  84. "whitelist pattern",
  85. ref.refname,
  86. ref.name,
  87. )
  88. continue
  89. else:
  90. logger.debug(
  91. "Skipping '%s' because its not a branch or tag", ref.refname
  92. )
  93. continue
  94. missing_files = [
  95. filename
  96. for filename in files
  97. if filename != "."
  98. and not file_exists(gitroot, ref.refname, filename)
  99. ]
  100. if missing_files:
  101. logger.debug(
  102. "Skipping '%s' because it lacks required files: %r",
  103. ref.refname,
  104. missing_files,
  105. )
  106. continue
  107. yield ref
  108. def file_exists(gitroot, refname, filename):
  109. if os.sep != "/":
  110. # Git requires / path sep, make sure we use that
  111. filename = filename.replace(os.sep, "/")
  112. cmd = (
  113. "git",
  114. "cat-file",
  115. "-e",
  116. "{}:{}".format(refname, filename),
  117. )
  118. proc = subprocess.run(
  119. cmd, cwd=gitroot, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
  120. )
  121. return proc.returncode == 0
  122. def copy_tree(src, dst, reference, sourcepath="."):
  123. with tempfile.SpooledTemporaryFile() as fp:
  124. cmd = (
  125. "git",
  126. "archive",
  127. "--format",
  128. "tar",
  129. reference.commit,
  130. "--",
  131. sourcepath,
  132. )
  133. subprocess.check_call(cmd, stdout=fp)
  134. fp.seek(0)
  135. with tarfile.TarFile(fileobj=fp) as tarfp:
  136. tarfp.extractall(dst)