clm5.0/manage_externals/manic/repository_git.py
2025-01-12 20:48:10 +08:00

820 lines
29 KiB
Python

"""Class for interacting with git repositories
"""
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
import copy
import os
from .global_constants import EMPTY_STR, LOCAL_PATH_INDICATOR
from .global_constants import VERBOSITY_VERBOSE
from .repository import Repository
from .externals_status import ExternalStatus
from .externals_description import ExternalsDescription, git_submodule_status
from .utils import expand_local_url, split_remote_url, is_remote_url
from .utils import fatal_error, printlog
from .utils import execute_subprocess
class GitRepository(Repository):
"""Class to represent and operate on a repository description.
For testing purpose, all system calls to git should:
* be isolated in separate functions with no application logic
* of the form:
- cmd = ['git', ...]
- value = execute_subprocess(cmd, output_to_caller={T|F},
status_to_caller={T|F})
- return value
* be static methods (not rely on self)
* name as _git_subcommand_args(user_args)
This convention allows easy unit testing of the repository logic
by mocking the specific calls to return predefined results.
"""
def __init__(self, component_name, repo):
"""
Parse repo (a <repo> XML element).
"""
Repository.__init__(self, component_name, repo)
self._gitmodules = None
self._submods = None
# ----------------------------------------------------------------
#
# Public API, defined by Repository
#
# ----------------------------------------------------------------
def checkout(self, base_dir_path, repo_dir_name, verbosity, recursive):
"""
If the repo destination directory exists, ensure it is correct (from
correct URL, correct branch or tag), and possibly update the source.
If the repo destination directory does not exist, checkout the correct
branch or tag.
"""
repo_dir_path = os.path.join(base_dir_path, repo_dir_name)
repo_dir_exists = os.path.exists(repo_dir_path)
if (repo_dir_exists and not os.listdir(
repo_dir_path)) or not repo_dir_exists:
self._clone_repo(base_dir_path, repo_dir_name, verbosity)
self._checkout_ref(repo_dir_path, verbosity, recursive)
gmpath = os.path.join(repo_dir_path,
ExternalsDescription.GIT_SUBMODULES_FILENAME)
if os.path.exists(gmpath):
self._gitmodules = gmpath
self._submods = git_submodule_status(repo_dir_path)
else:
self._gitmodules = None
self._submods = None
def status(self, stat, repo_dir_path):
"""
If the repo destination directory exists, ensure it is correct (from
correct URL, correct branch or tag), and possibly update the source.
If the repo destination directory does not exist, checkout the correct
branch or tag.
"""
self._check_sync(stat, repo_dir_path)
if os.path.exists(repo_dir_path):
self._status_summary(stat, repo_dir_path)
def submodules_file(self, repo_path=None):
if repo_path is not None:
gmpath = os.path.join(repo_path,
ExternalsDescription.GIT_SUBMODULES_FILENAME)
if os.path.exists(gmpath):
self._gitmodules = gmpath
self._submods = git_submodule_status(repo_path)
return self._gitmodules
# ----------------------------------------------------------------
#
# Internal work functions
#
# ----------------------------------------------------------------
def _clone_repo(self, base_dir_path, repo_dir_name, verbosity):
"""Prepare to execute the clone by managing directory location
"""
cwd = os.getcwd()
os.chdir(base_dir_path)
self._git_clone(self._url, repo_dir_name, verbosity)
os.chdir(cwd)
def _current_ref(self):
"""Determine the *name* associated with HEAD.
If we're on a branch, then returns the branch name; otherwise,
if we're on a tag, then returns the tag name; otherwise, returns
the current hash. Returns an empty string if no reference can be
determined (e.g., if we're not actually in a git repository).
"""
ref_found = False
# If we're on a branch, then use that as the current ref
branch_found, branch_name = self._git_current_branch()
if branch_found:
current_ref = branch_name
ref_found = True
if not ref_found:
# Otherwise, if we're exactly at a tag, use that as the
# current ref
tag_found, tag_name = self._git_current_tag()
if tag_found:
current_ref = tag_name
ref_found = True
if not ref_found:
# Otherwise, use current hash as the current ref
hash_found, hash_name = self._git_current_hash()
if hash_found:
current_ref = hash_name
ref_found = True
if not ref_found:
# If we still can't find a ref, return empty string. This
# can happen if we're not actually in a git repo
current_ref = ''
return current_ref
def _check_sync(self, stat, repo_dir_path):
"""Determine whether a git repository is in-sync with the model
description.
Because repos can have multiple remotes, the only criteria is
whether the branch or tag is the same.
"""
if not os.path.exists(repo_dir_path):
# NOTE(bja, 2017-10) condition should have been determined
# by _Source() object and should never be here!
stat.sync_state = ExternalStatus.STATUS_ERROR
else:
git_dir = os.path.join(repo_dir_path, '.git')
if not os.path.exists(git_dir):
# NOTE(bja, 2017-10) directory exists, but no git repo
# info.... Can't test with subprocess git command
# because git will move up directory tree until it
# finds the parent repo git dir!
stat.sync_state = ExternalStatus.UNKNOWN
else:
self._check_sync_logic(stat, repo_dir_path)
def _check_sync_logic(self, stat, repo_dir_path):
"""Compare the underlying hashes of the currently checkout ref and the
expected ref.
Output: sets the sync_state as well as the current and
expected ref in the input status object.
"""
def compare_refs(current_ref, expected_ref):
"""Compare the current and expected ref.
"""
if current_ref == expected_ref:
status = ExternalStatus.STATUS_OK
else:
status = ExternalStatus.MODEL_MODIFIED
return status
cwd = os.getcwd()
os.chdir(repo_dir_path)
# get the full hash of the current commit
_, current_ref = self._git_current_hash()
if self._branch:
if self._url == LOCAL_PATH_INDICATOR:
expected_ref = self._branch
else:
remote_name = self._determine_remote_name()
if not remote_name:
# git doesn't know about this remote. by definition
# this is a modified state.
expected_ref = "unknown_remote/{0}".format(self._branch)
else:
expected_ref = "{0}/{1}".format(remote_name, self._branch)
elif self._hash:
expected_ref = self._hash
elif self._tag:
expected_ref = self._tag
else:
msg = 'In repo "{0}": none of branch, hash or tag are set'.format(
self._name)
fatal_error(msg)
# record the *names* of the current and expected branches
stat.current_version = self._current_ref()
stat.expected_version = copy.deepcopy(expected_ref)
if current_ref == EMPTY_STR:
stat.sync_state = ExternalStatus.UNKNOWN
else:
# get the underlying hash of the expected ref
revparse_status, expected_ref_hash = self._git_revparse_commit(
expected_ref)
if revparse_status:
# We failed to get the hash associated with
# expected_ref. Maybe we should assign this to some special
# status, but for now we're just calling this out-of-sync to
# remain consistent with how this worked before.
stat.sync_state = ExternalStatus.MODEL_MODIFIED
else:
# compare the underlying hashes
stat.sync_state = compare_refs(current_ref, expected_ref_hash)
os.chdir(cwd)
def _determine_remote_name(self):
"""Return the remote name.
Note that this is for the *future* repo url and branch, not
the current working copy!
"""
git_output = self._git_remote_verbose()
git_output = git_output.splitlines()
remote_name = ''
for line in git_output:
data = line.strip()
if not data:
continue
data = data.split()
name = data[0].strip()
url = data[1].strip()
if self._url == url:
remote_name = name
break
return remote_name
def _create_remote_name(self):
"""The url specified in the externals description file was not known
to git. We need to add it, which means adding a unique and
safe name....
The assigned name needs to be safe for git to use, e.g. can't
look like a path 'foo/bar' and work with both remote and local paths.
Remote paths include but are not limited to: git, ssh, https,
github, gitlab, bitbucket, custom server, etc.
Local paths can be relative or absolute. They may contain
shell variables, e.g. ${REPO_ROOT}/repo_name, or username
expansion, i.e. ~/ or ~someuser/.
Relative paths must be at least one layer of redirection, i.e.
container/../ext_repo, but may be many layers deep, e.g.
container/../../../../../ext_repo
NOTE(bja, 2017-11)
The base name below may not be unique, for example if the
user has local paths like:
/path/to/my/repos/nice_repo
/path/to/other/repos/nice_repo
But the current implementation should cover most common
use cases for remotes and still provide usable names.
"""
url = copy.deepcopy(self._url)
if is_remote_url(url):
url = split_remote_url(url)
else:
url = expand_local_url(url, self._name)
url = url.split('/')
repo_name = url[-1]
base_name = url[-2]
# repo name should nominally already be something that git can
# deal with. We need to remove other possibly troublesome
# punctuation, e.g. /, $, from the base name.
unsafe_characters = '!@#$%^&*()[]{}\\/,;~'
for unsafe in unsafe_characters:
base_name = base_name.replace(unsafe, '')
remote_name = "{0}_{1}".format(base_name, repo_name)
return remote_name
def _checkout_ref(self, repo_dir, verbosity, submodules):
"""Checkout the user supplied reference
if <submodules> is True, recursively initialize and update
the repo's submodules
"""
# import pdb; pdb.set_trace()
cwd = os.getcwd()
os.chdir(repo_dir)
if self._url.strip() == LOCAL_PATH_INDICATOR:
self._checkout_local_ref(verbosity, submodules)
else:
self._checkout_external_ref(verbosity, submodules)
if self._sparse:
self._sparse_checkout(repo_dir, verbosity)
os.chdir(cwd)
def _checkout_local_ref(self, verbosity, submodules):
"""Checkout the reference considering the local repo only. Do not
fetch any additional remotes or specify the remote when
checkout out the ref.
if <submodules> is True, recursively initialize and update
the repo's submodules
"""
if self._tag:
ref = self._tag
elif self._branch:
ref = self._branch
else:
ref = self._hash
self._check_for_valid_ref(ref)
self._git_checkout_ref(ref, verbosity, submodules)
def _checkout_external_ref(self, verbosity, submodules):
"""Checkout the reference from a remote repository
if <submodules> is True, recursively initialize and update
the repo's submodules
"""
if self._tag:
ref = self._tag
elif self._branch:
ref = self._branch
else:
ref = self._hash
remote_name = self._determine_remote_name()
if not remote_name:
remote_name = self._create_remote_name()
self._git_remote_add(remote_name, self._url)
self._git_fetch(remote_name)
# NOTE(bja, 2018-03) we need to send separate ref and remote
# name to check_for_vaild_ref, but the combined name to
# checkout_ref!
self._check_for_valid_ref(ref, remote_name)
if self._branch:
ref = '{0}/{1}'.format(remote_name, ref)
self._git_checkout_ref(ref, verbosity, submodules)
def _sparse_checkout(self, repo_dir, verbosity):
"""Use git read-tree to thin the working tree."""
cwd = os.getcwd()
cmd = ['cp', self._sparse, os.path.join(repo_dir,
'.git/info/sparse-checkout')]
if verbosity >= VERBOSITY_VERBOSE:
printlog(' {0}'.format(' '.join(cmd)))
execute_subprocess(cmd)
os.chdir(repo_dir)
self._git_sparse_checkout(verbosity)
os.chdir(cwd)
def _check_for_valid_ref(self, ref, remote_name=None):
"""Try some basic sanity checks on the user supplied reference so we
can provide a more useful error message than calledprocess
error...
"""
is_tag = self._ref_is_tag(ref)
is_branch = self._ref_is_branch(ref, remote_name)
is_hash = self._ref_is_hash(ref)
is_valid = is_tag or is_branch or is_hash
if not is_valid:
msg = ('In repo "{0}": reference "{1}" does not appear to be a '
'valid tag, branch or hash! Please verify the reference '
'name (e.g. spelling), is available from: {2} '.format(
self._name, ref, self._url))
fatal_error(msg)
if is_tag:
is_unique_tag, msg = self._is_unique_tag(ref, remote_name)
if not is_unique_tag:
msg = ('In repo "{0}": tag "{1}" {2}'.format(
self._name, self._tag, msg))
fatal_error(msg)
return is_valid
def _is_unique_tag(self, ref, remote_name):
"""Verify that a reference is a valid tag and is unique (not a branch)
Tags may be tag names, or SHA id's. It is also possible that a
branch and tag have the some name.
Note: values returned by git_showref_* and git_revparse are
shell return codes, which are zero for success, non-zero for
error!
"""
is_tag = self._ref_is_tag(ref)
is_branch = self._ref_is_branch(ref, remote_name)
is_hash = self._ref_is_hash(ref)
msg = ''
is_unique_tag = False
if is_tag and not is_branch:
# unique tag
msg = 'is ok'
is_unique_tag = True
elif is_tag and is_branch:
msg = ('is both a branch and a tag. git may checkout the branch '
'instead of the tag depending on your version of git.')
is_unique_tag = False
elif not is_tag and is_branch:
msg = ('is a branch, and not a tag. If you intended to checkout '
'a branch, please change the externals description to be '
'a branch. If you intended to checkout a tag, it does not '
'exist. Please check the name.')
is_unique_tag = False
else: # not is_tag and not is_branch:
if is_hash:
# probably a sha1 or HEAD, etc, we call it a tag
msg = 'is ok'
is_unique_tag = True
else:
# undetermined state.
msg = ('does not appear to be a valid tag, branch or hash! '
'Please check the name and repository.')
is_unique_tag = False
return is_unique_tag, msg
def _ref_is_tag(self, ref):
"""Verify that a reference is a valid tag according to git.
Note: values returned by git_showref_* and git_revparse are
shell return codes, which are zero for success, non-zero for
error!
"""
is_tag = False
value = self._git_showref_tag(ref)
if value == 0:
is_tag = True
return is_tag
def _ref_is_branch(self, ref, remote_name=None):
"""Verify if a ref is any kind of branch (local, tracked remote,
untracked remote).
"""
local_branch = False
remote_branch = False
if remote_name:
remote_branch = self._ref_is_remote_branch(ref, remote_name)
local_branch = self._ref_is_local_branch(ref)
is_branch = False
if local_branch or remote_branch:
is_branch = True
return is_branch
def _ref_is_local_branch(self, ref):
"""Verify that a reference is a valid branch according to git.
show-ref branch returns local branches that have been
previously checked out. It will not necessarily pick up
untracked remote branches.
Note: values returned by git_showref_* and git_revparse are
shell return codes, which are zero for success, non-zero for
error!
"""
is_branch = False
value = self._git_showref_branch(ref)
if value == 0:
is_branch = True
return is_branch
def _ref_is_remote_branch(self, ref, remote_name):
"""Verify that a reference is a valid branch according to git.
show-ref branch returns local branches that have been
previously checked out. It will not necessarily pick up
untracked remote branches.
Note: values returned by git_showref_* and git_revparse are
shell return codes, which are zero for success, non-zero for
error!
"""
is_branch = False
value = self._git_lsremote_branch(ref, remote_name)
if value == 0:
is_branch = True
return is_branch
def _ref_is_commit(self, ref):
"""Verify that a reference is a valid commit according to git.
This could be a tag, branch, sha1 id, HEAD and potentially others...
Note: values returned by git_showref_* and git_revparse are
shell return codes, which are zero for success, non-zero for
error!
"""
is_commit = False
value, _ = self._git_revparse_commit(ref)
if value == 0:
is_commit = True
return is_commit
def _ref_is_hash(self, ref):
"""Verify that a reference is a valid hash according to git.
Git doesn't seem to provide an exact way to determine if user
supplied reference is an actual hash. So we verify that the
ref is a valid commit and return the underlying commit
hash. Then check that the commit hash begins with the user
supplied string.
Note: values returned by git_showref_* and git_revparse are
shell return codes, which are zero for success, non-zero for
error!
"""
is_hash = False
status, git_output = self._git_revparse_commit(ref)
if status == 0:
if git_output.strip().startswith(ref):
is_hash = True
return is_hash
def _status_summary(self, stat, repo_dir_path):
"""Determine the clean/dirty status of a git repository
"""
cwd = os.getcwd()
os.chdir(repo_dir_path)
git_output = self._git_status_porcelain_v1z()
is_dirty = self._status_v1z_is_dirty(git_output)
if is_dirty:
stat.clean_state = ExternalStatus.DIRTY
else:
stat.clean_state = ExternalStatus.STATUS_OK
# Now save the verbose status output incase the user wants to
# see it.
stat.status_output = self._git_status_verbose()
os.chdir(cwd)
@staticmethod
def _status_v1z_is_dirty(git_output):
"""Parse the git status output from --porcelain=v1 -z and determine if
the repo status is clean or dirty. Dirty means:
* modified files
* missing files
* added files
* removed
* renamed
* unmerged
Whether untracked files are considered depends on how the status
command was run (i.e., whether it was run with the '-u' option).
NOTE: Based on the above definition, the porcelain status
should be an empty string to be considered 'clean'. Of course
this assumes we only get an empty string from an status
command on a clean checkout, and not some error
condition... Could alse use 'git diff --quiet'.
"""
is_dirty = False
if git_output:
is_dirty = True
return is_dirty
# ----------------------------------------------------------------
#
# system call to git for information gathering
#
# ----------------------------------------------------------------
@staticmethod
def _git_current_hash():
"""Return the full hash of the currently checked-out version.
Returns a tuple, (hash_found, hash), where hash_found is a
logical specifying whether a hash was found for HEAD (False
could mean we're not in a git repository at all). (If hash_found
is False, then hash is ''.)
"""
status, git_output = GitRepository._git_revparse_commit("HEAD")
hash_found = not status
if not hash_found:
git_output = ''
return hash_found, git_output
@staticmethod
def _git_current_branch():
"""Determines the name of the current branch.
Returns a tuple, (branch_found, branch_name), where branch_found
is a logical specifying whether a branch name was found for
HEAD. (If branch_found is False, then branch_name is ''.)
"""
cmd = ['git', 'symbolic-ref', '--short', '-q', 'HEAD']
status, git_output = execute_subprocess(cmd,
output_to_caller=True,
status_to_caller=True)
branch_found = not status
if branch_found:
git_output = git_output.strip()
else:
git_output = ''
return branch_found, git_output
@staticmethod
def _git_current_tag():
"""Determines the name tag corresponding to HEAD (if any).
Returns a tuple, (tag_found, tag_name), where tag_found is a
logical specifying whether we found a tag name corresponding to
HEAD. (If tag_found is False, then tag_name is ''.)
"""
# git describe --exact-match --tags HEAD
cmd = ['git', 'describe', '--exact-match', '--tags', 'HEAD']
status, git_output = execute_subprocess(cmd,
output_to_caller=True,
status_to_caller=True)
tag_found = not status
if tag_found:
git_output = git_output.strip()
else:
git_output = ''
return tag_found, git_output
@staticmethod
def _git_showref_tag(ref):
"""Run git show-ref check if the user supplied ref is a tag.
could also use git rev-parse --quiet --verify tagname^{tag}
"""
cmd = ['git', 'show-ref', '--quiet', '--verify',
'refs/tags/{0}'.format(ref), ]
status = execute_subprocess(cmd, status_to_caller=True)
return status
@staticmethod
def _git_showref_branch(ref):
"""Run git show-ref check if the user supplied ref is a local or
tracked remote branch.
"""
cmd = ['git', 'show-ref', '--quiet', '--verify',
'refs/heads/{0}'.format(ref), ]
status = execute_subprocess(cmd, status_to_caller=True)
return status
@staticmethod
def _git_lsremote_branch(ref, remote_name):
"""Run git ls-remote to check if the user supplied ref is a remote
branch that is not being tracked
"""
cmd = ['git', 'ls-remote', '--exit-code', '--heads',
remote_name, ref, ]
status = execute_subprocess(cmd, status_to_caller=True)
return status
@staticmethod
def _git_revparse_commit(ref):
"""Run git rev-parse to detect if a reference is a SHA, HEAD or other
valid commit.
"""
cmd = ['git', 'rev-parse', '--quiet', '--verify',
'{0}^{1}'.format(ref, '{commit}'), ]
status, git_output = execute_subprocess(cmd, status_to_caller=True,
output_to_caller=True)
git_output = git_output.strip()
return status, git_output
@staticmethod
def _git_status_porcelain_v1z():
"""Run git status to obtain repository information.
This is run with '--untracked=no' to ignore untracked files.
The machine-portable format that is guaranteed not to change
between git versions or *user configuration*.
"""
cmd = ['git', 'status', '--untracked-files=no', '--porcelain', '-z']
git_output = execute_subprocess(cmd, output_to_caller=True)
return git_output
@staticmethod
def _git_status_verbose():
"""Run the git status command to obtain repository information.
"""
cmd = ['git', 'status']
git_output = execute_subprocess(cmd, output_to_caller=True)
return git_output
@staticmethod
def _git_remote_verbose():
"""Run the git remote command to obtain repository information.
"""
cmd = ['git', 'remote', '--verbose']
git_output = execute_subprocess(cmd, output_to_caller=True)
return git_output
@staticmethod
def has_submodules(repo_dir_path=None):
"""Return True iff the repository at <repo_dir_path> (or the current
directory if <repo_dir_path> is None) has a '.gitmodules' file
"""
if repo_dir_path is None:
fname = ExternalsDescription.GIT_SUBMODULES_FILENAME
else:
fname = os.path.join(repo_dir_path,
ExternalsDescription.GIT_SUBMODULES_FILENAME)
return os.path.exists(fname)
# ----------------------------------------------------------------
#
# system call to git for sideffects modifying the working tree
#
# ----------------------------------------------------------------
@staticmethod
def _git_clone(url, repo_dir_name, verbosity):
"""Run git clone for the side effect of creating a repository.
"""
cmd = ['git', 'clone', '--quiet']
subcmd = None
cmd.extend([url, repo_dir_name])
if verbosity >= VERBOSITY_VERBOSE:
printlog(' {0}'.format(' '.join(cmd)))
execute_subprocess(cmd)
if subcmd is not None:
os.chdir(repo_dir_name)
execute_subprocess(subcmd)
@staticmethod
def _git_remote_add(name, url):
"""Run the git remote command for the side effect of adding a remote
"""
cmd = ['git', 'remote', 'add', name, url]
execute_subprocess(cmd)
@staticmethod
def _git_fetch(remote_name):
"""Run the git fetch command for the side effect of updating the repo
"""
cmd = ['git', 'fetch', '--quiet', '--tags', remote_name]
execute_subprocess(cmd)
@staticmethod
def _git_checkout_ref(ref, verbosity, submodules):
"""Run the git checkout command for the side effect of updating the repo
Param: ref is a reference to a local or remote object in the
form 'origin/my_feature', or 'tag1'.
"""
cmd = ['git', 'checkout', '--quiet', ref]
if verbosity >= VERBOSITY_VERBOSE:
printlog(' {0}'.format(' '.join(cmd)))
execute_subprocess(cmd)
if submodules:
GitRepository._git_update_submodules(verbosity)
@staticmethod
def _git_sparse_checkout(verbosity):
"""Configure repo via read-tree."""
cmd = ['git', 'config', 'core.sparsecheckout', 'true']
if verbosity >= VERBOSITY_VERBOSE:
printlog(' {0}'.format(' '.join(cmd)))
execute_subprocess(cmd)
cmd = ['git', 'read-tree', '-mu', 'HEAD']
if verbosity >= VERBOSITY_VERBOSE:
printlog(' {0}'.format(' '.join(cmd)))
execute_subprocess(cmd)
@staticmethod
def _git_update_submodules(verbosity):
"""Run git submodule update for the side effect of updating this
repo's submodules.
"""
# First, verify that we have a .gitmodules file
if os.path.exists(ExternalsDescription.GIT_SUBMODULES_FILENAME):
cmd = ['git', 'submodule', 'update', '--init', '--recursive']
if verbosity >= VERBOSITY_VERBOSE:
printlog(' {0}'.format(' '.join(cmd)))
execute_subprocess(cmd)