331 lines
9.7 KiB
Python
331 lines
9.7 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Common public utilities for manic package
|
|
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import unicode_literals
|
|
from __future__ import print_function
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from threading import Timer
|
|
|
|
from .global_constants import LOCAL_PATH_INDICATOR
|
|
|
|
# ---------------------------------------------------------------------
|
|
#
|
|
# screen and logging output and functions to massage text for output
|
|
#
|
|
# ---------------------------------------------------------------------
|
|
|
|
|
|
def log_process_output(output):
|
|
"""Log each line of process output at debug level so it can be
|
|
filtered if necessary. By default, output is a single string, and
|
|
logging.debug(output) will only put log info heading on the first
|
|
line. This makes it hard to filter with grep.
|
|
|
|
"""
|
|
output = output.split('\n')
|
|
for line in output:
|
|
logging.debug(line)
|
|
|
|
|
|
def printlog(msg, **kwargs):
|
|
"""Wrapper script around print to ensure that everything printed to
|
|
the screen also gets logged.
|
|
|
|
"""
|
|
logging.info(msg)
|
|
if kwargs:
|
|
print(msg, **kwargs)
|
|
else:
|
|
print(msg)
|
|
sys.stdout.flush()
|
|
|
|
|
|
def last_n_lines(the_string, n_lines, truncation_message=None):
|
|
"""Returns the last n lines of the given string
|
|
|
|
Args:
|
|
the_string: str
|
|
n_lines: int
|
|
truncation_message: str, optional
|
|
|
|
Returns a string containing the last n lines of the_string
|
|
|
|
If truncation_message is provided, the returned string begins with
|
|
the given message if and only if the string is greater than n lines
|
|
to begin with.
|
|
"""
|
|
|
|
lines = the_string.splitlines(True)
|
|
if len(lines) <= n_lines:
|
|
return_val = the_string
|
|
else:
|
|
lines_subset = lines[-n_lines:]
|
|
str_truncated = ''.join(lines_subset)
|
|
if truncation_message:
|
|
str_truncated = truncation_message + '\n' + str_truncated
|
|
return_val = str_truncated
|
|
|
|
return return_val
|
|
|
|
|
|
def indent_string(the_string, indent_level):
|
|
"""Indents the given string by a given number of spaces
|
|
|
|
Args:
|
|
the_string: str
|
|
indent_level: int
|
|
|
|
Returns a new string that is the same as the_string, except that
|
|
each line is indented by 'indent_level' spaces.
|
|
|
|
In python3, this can be done with textwrap.indent.
|
|
"""
|
|
|
|
lines = the_string.splitlines(True)
|
|
padding = ' ' * indent_level
|
|
lines_indented = [padding + line for line in lines]
|
|
return ''.join(lines_indented)
|
|
|
|
# ---------------------------------------------------------------------
|
|
#
|
|
# error handling
|
|
#
|
|
# ---------------------------------------------------------------------
|
|
|
|
|
|
def fatal_error(message):
|
|
"""
|
|
Error output function
|
|
"""
|
|
logging.error(message)
|
|
raise RuntimeError("{0}ERROR: {1}".format(os.linesep, message))
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
#
|
|
# Data conversion / manipulation
|
|
#
|
|
# ---------------------------------------------------------------------
|
|
def str_to_bool(bool_str):
|
|
"""Convert a sting representation of as boolean into a true boolean.
|
|
|
|
Conversion should be case insensitive.
|
|
"""
|
|
value = None
|
|
str_lower = bool_str.lower()
|
|
if str_lower in ('true', 't'):
|
|
value = True
|
|
elif str_lower in ('false', 'f'):
|
|
value = False
|
|
if value is None:
|
|
msg = ('ERROR: invalid boolean string value "{0}". '
|
|
'Must be "true" or "false"'.format(bool_str))
|
|
fatal_error(msg)
|
|
return value
|
|
|
|
|
|
REMOTE_PREFIXES = ['http://', 'https://', 'ssh://', 'git@']
|
|
|
|
|
|
def is_remote_url(url):
|
|
"""check if the user provided a local file path instead of a
|
|
remote. If so, it must be expanded to an absolute
|
|
path.
|
|
|
|
"""
|
|
remote_url = False
|
|
for prefix in REMOTE_PREFIXES:
|
|
if url.startswith(prefix):
|
|
remote_url = True
|
|
return remote_url
|
|
|
|
|
|
def split_remote_url(url):
|
|
"""check if the user provided a local file path or a
|
|
remote. If remote, try to strip off protocol info.
|
|
|
|
"""
|
|
remote_url = is_remote_url(url)
|
|
if not remote_url:
|
|
return url
|
|
|
|
for prefix in REMOTE_PREFIXES:
|
|
url = url.replace(prefix, '')
|
|
|
|
if '@' in url:
|
|
url = url.split('@')[1]
|
|
|
|
if ':' in url:
|
|
url = url.split(':')[1]
|
|
|
|
return url
|
|
|
|
|
|
def expand_local_url(url, field):
|
|
"""check if the user provided a local file path instead of a
|
|
remote. If so, it must be expanded to an absolute
|
|
path.
|
|
|
|
Note: local paths of LOCAL_PATH_INDICATOR have special meaning and
|
|
represent local copy only, don't work with the remotes.
|
|
|
|
"""
|
|
remote_url = is_remote_url(url)
|
|
if not remote_url:
|
|
if url.strip() == LOCAL_PATH_INDICATOR:
|
|
pass
|
|
else:
|
|
url = os.path.expandvars(url)
|
|
url = os.path.expanduser(url)
|
|
if not os.path.isabs(url):
|
|
msg = ('WARNING: Externals description for "{0}" contains a '
|
|
'url that is not remote and does not expand to an '
|
|
'absolute path. Version control operations may '
|
|
'fail.\n\nurl={1}'.format(field, url))
|
|
printlog(msg)
|
|
else:
|
|
url = os.path.normpath(url)
|
|
return url
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
#
|
|
# subprocess
|
|
#
|
|
# ---------------------------------------------------------------------
|
|
|
|
# Give the user a helpful message if we detect that a command seems to
|
|
# be hanging.
|
|
_HANGING_SEC = 300
|
|
|
|
|
|
def _hanging_msg(working_directory, command):
|
|
print("""
|
|
|
|
Command '{command}'
|
|
from directory {working_directory}
|
|
has taken {hanging_sec} seconds. It may be hanging.
|
|
|
|
The command will continue to run, but you may want to abort
|
|
manage_externals with ^C and investigate. A possible cause of hangs is
|
|
when svn or git require authentication to access a private
|
|
repository. On some systems, svn and git requests for authentication
|
|
information will not be displayed to the user. In this case, the program
|
|
will appear to hang. Ensure you can run svn and git manually and access
|
|
all repositories without entering your authentication information.
|
|
|
|
""".format(command=command,
|
|
working_directory=working_directory,
|
|
hanging_sec=_HANGING_SEC))
|
|
|
|
|
|
def execute_subprocess(commands, status_to_caller=False,
|
|
output_to_caller=False):
|
|
"""Wrapper around subprocess.check_output to handle common
|
|
exceptions.
|
|
|
|
check_output runs a command with arguments and waits
|
|
for it to complete.
|
|
|
|
check_output raises an exception on a nonzero return code. if
|
|
status_to_caller is true, execute_subprocess returns the subprocess
|
|
return code, otherwise execute_subprocess treats non-zero return
|
|
status as an error and raises an exception.
|
|
|
|
"""
|
|
cwd = os.getcwd()
|
|
msg = 'In directory: {0}\nexecute_subprocess running command:'.format(cwd)
|
|
logging.info(msg)
|
|
commands_str = ' '.join(commands)
|
|
logging.info(commands_str)
|
|
return_to_caller = status_to_caller or output_to_caller
|
|
status = -1
|
|
output = ''
|
|
hanging_timer = Timer(_HANGING_SEC, _hanging_msg,
|
|
kwargs={"working_directory": cwd,
|
|
"command": commands_str})
|
|
hanging_timer.start()
|
|
try:
|
|
output = subprocess.check_output(commands, stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
log_process_output(output)
|
|
status = 0
|
|
except OSError as error:
|
|
msg = failed_command_msg(
|
|
'Command execution failed. Does the executable exist?',
|
|
commands)
|
|
logging.error(error)
|
|
fatal_error(msg)
|
|
except ValueError as error:
|
|
msg = failed_command_msg(
|
|
'DEV_ERROR: Invalid arguments trying to run subprocess',
|
|
commands)
|
|
logging.error(error)
|
|
fatal_error(msg)
|
|
except subprocess.CalledProcessError as error:
|
|
# Only report the error if we are NOT returning to the
|
|
# caller. If we are returning to the caller, then it may be a
|
|
# simple status check. If returning, it is the callers
|
|
# responsibility determine if an error occurred and handle it
|
|
# appropriately.
|
|
if not return_to_caller:
|
|
msg_context = ('Process did not run successfully; '
|
|
'returned status {0}'.format(error.returncode))
|
|
msg = failed_command_msg(msg_context, commands,
|
|
output=error.output)
|
|
logging.error(error)
|
|
logging.error(msg)
|
|
log_process_output(error.output)
|
|
fatal_error(msg)
|
|
status = error.returncode
|
|
finally:
|
|
hanging_timer.cancel()
|
|
|
|
if status_to_caller and output_to_caller:
|
|
ret_value = (status, output)
|
|
elif status_to_caller:
|
|
ret_value = status
|
|
elif output_to_caller:
|
|
ret_value = output
|
|
else:
|
|
ret_value = None
|
|
|
|
return ret_value
|
|
|
|
|
|
def failed_command_msg(msg_context, command, output=None):
|
|
"""Template for consistent error messages from subprocess calls.
|
|
|
|
If 'output' is given, it should provide the output from the failed
|
|
command
|
|
"""
|
|
|
|
if output:
|
|
output_truncated = last_n_lines(output, 20,
|
|
truncation_message='[... Output truncated for brevity ...]')
|
|
errmsg = ('Failed with output:\n' +
|
|
indent_string(output_truncated, 4) +
|
|
'\nERROR: ')
|
|
else:
|
|
errmsg = ''
|
|
|
|
command_str = ' '.join(command)
|
|
errmsg += """In directory
|
|
{cwd}
|
|
{context}:
|
|
{command}
|
|
""".format(cwd=os.getcwd(), context=msg_context, command=command_str)
|
|
|
|
if output:
|
|
errmsg += 'See above for output from failed command.\n'
|
|
|
|
return errmsg
|