From af9b561e92cf3c1d8275bde3003b03e9c8f7c852 Mon Sep 17 00:00:00 2001 From: Chris Johns Date: Thu, 30 May 2019 20:26:55 +1000 Subject: rtemstoolkit/path: Merge RSB changes. --- rtemstoolkit/path.py | 294 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 233 insertions(+), 61 deletions(-) diff --git a/rtemstoolkit/path.py b/rtemstoolkit/path.py index 845dfe8..9401e99 100644 --- a/rtemstoolkit/path.py +++ b/rtemstoolkit/path.py @@ -40,37 +40,60 @@ import glob import os import shutil import string +import sys from rtemstoolkit import error from rtemstoolkit import log - +windows_posix = sys.platform == 'msys' windows = os.name == 'nt' +win_maxpath = 254 + def host(path): if path is not None: while '//' in path: path = path.replace('//', '/') - if windows and len(path) > 2: - if path[0] == '/' and path[2] == '/' and \ - (path[1] in string.ascii_lowercase or \ - path[1] in string.ascii_uppercase): - path = ('%s:%s' % (path[1], path[2:])).replace('/', '\\') + if windows: + if len(path) > 2 and \ + path[0] == '/' and path[2] == '/' and \ + (path[1] in string.ascii_lowercase or \ + path[1] in string.ascii_uppercase): + path = '%s:%s' % (path[1], path[2:]) + path = path.replace('/', '\\') + if len(path) > win_maxpath: + if path.startswith('\\\\?\\'): + path = path[4:] + path = u'\\'.join([u'\\\\?', path]) return path def shell(path): + if isinstance(path, bytes): + path = path.decode('ascii') if path is not None: - if windows and len(path) > 1 and path[1] == ':': - path = ('/%s%s' % (path[0], path[2:])).replace('\\', '/') + if windows or windows_posix: + path = path.encode('ascii', 'ignore').decode('ascii') + if path.startswith('\\\\?\\'): + path = path[4:] + if len(path) > 1 and path[1] == ':': + path = '/%s%s' % (path[0].lower(), path[2:]) + path = path.replace('\\', '/') while '//' in path: path = path.replace('//', '/') return path def basename(path): + path = shell(path) return shell(os.path.basename(path)) def dirname(path): + path = shell(path) return shell(os.path.dirname(path)) +def is_abspath(path): + if path is not None and len(path) > 0: + return '/' == path[0] + return False + def join(path, *args): path = shell(path) for arg in args: @@ -81,36 +104,91 @@ def join(path, *args): return shell(path) def abspath(path): + path = shell(path) return shell(os.path.abspath(host(path))) +def relpath(path, start = None): + path = shell(path) + if start is None: + path = os.path.relpath(host(path)) + else: + path = os.path.relpath(host(path), start) + return shell(path) + def splitext(path): + path = shell(path) root, ext = os.path.splitext(host(path)) return shell(root), ext +def listdir(path, error = True): + path = host(path) + files = [] + if not os.path.exists(path): + if error: + raise error.general('path does not exist : %s' % (path)) + elif not isdir(path): + if error: + raise error.general('path is not a directory: %s' % (path)) + else: + if windows: + try: + files = os.listdir(host(path)) + except IOError: + raise error.general('Could not list files: %s' % (path)) + except OSError as e: + raise error.general('Could not list files: %s: %s' % (path, str(e))) + except WindowsError as e: + raise error.general('Could not list files: %s: %s' % (path, str(e))) + else: + try: + files = os.listdir(host(path)) + except IOError: + raise error.general('Could not list files: %s' % (path)) + except OSError as e: + raise error.general('Could not list files: %s: %s' % (path, str(e))) + return files + def exists(paths): + def _exists(p): + if not is_abspath(p): + p = shell(join(os.getcwd(), host(p))) + return basename(p) in ['.'] + listdir(dirname(p), error = False) + if type(paths) == list: results = [] for p in paths: - results += [os.path.exists(host(p))] + results += [_exists(shell(p))] return results - return os.path.exists(host(paths)) + return _exists(shell(paths)) def isdir(path): + path = shell(path) return os.path.isdir(host(path)) def isfile(path): + path = shell(path) return os.path.isfile(host(path)) def isabspath(path): + path = shell(path) return path[0] == '/' +def isreadable(path): + path = shell(path) + return os.access(host(path), os.R_OK) + def iswritable(path): + path = shell(path) return os.access(host(path), os.W_OK) +def isreadwritable(path): + path = shell(path) + return isreadable(path) and iswritable(path) + def ispathwritable(path): path = host(path) while len(path) != 0: - if os.path.exists(path): + if exists(path): return iswritable(path) path = os.path.dirname(path) return False @@ -138,18 +216,55 @@ def mkdir(path): except OSError as e: raise error.general('cannot make directory: %s: %s' % (path, str(e))) +def chdir(path): + path = shell(path) + os.chdir(host(path)) + def removeall(path): + # + # Perform the removal of the directory tree manually so we can + # make sure on Windows the files are correctly encoded to avoid + # the file name size limit. On Windows the os.walk fails once we + # get to the max path length. + # + def _isdir(path): + hpath = host(path) + return os.path.isdir(hpath) and not os.path.islink(hpath) + + def _remove_node(path): + hpath = host(path) + if not os.path.islink(hpath) and not os.access(hpath, os.W_OK): + os.chmod(hpath, stat.S_IWUSR) + if _isdir(path): + os.rmdir(hpath) + else: + os.unlink(hpath) + + def _remove(path): + dirs = [] + for name in listdir(path): + path_ = join(path, name) + hname = host(path_) + if _isdir(path_): + dirs += [name] + else: + _remove_node(path_) + for name in dirs: + dir = join(path, name) + _remove(dir) + _remove_node(dir) - def _onerror(function, path, excinfo): - print('removeall error: (%s) %s' % (excinfo, path)) + path = shell(path) + hpath = host(path) - path = host(path) - shutil.rmtree(path, onerror = _onerror) + if os.path.exists(hpath): + _remove(path) + _remove_node(path) def expand(name, paths): l = [] for p in paths: - l += [join(p, name)] + l += [join(shell(p), name)] return l def expanduser(path): @@ -157,33 +272,6 @@ def expanduser(path): path = os.path.expanduser(path) return shell(path) -def listdir(path): - path = host(path) - files = [] - if not exists(path): - raise error.general('path does not exist : %s' % (path)) - elif not isdir(path): - raise error.general('path is not a directory: %s' % (path)) - else: - if windows: - try: - files = os.listdir(host(path)) - except IOError: - raise error.general('Could not list files: %s' % (path)) - except OSError as e: - raise error.general('Could not list files: %s: %s' % (path, str(e))) - except WindowsError as e: - raise error.general('Could not list files: %s: %s' % (path, str(e))) - else: - try: - files = os.listdir(host(path)) - except IOError: - raise error.general('Could not list files: %s' % (path)) - except OSError as e: - raise error.general('Could not list files: %s: %s' % (path, str(e))) - - return files - def collect_files(path_): # # Convert to shell paths and return shell paths. @@ -206,30 +294,59 @@ def collect_files(path_): files = [host(path_)] return sorted(files) +def copy(src, dst): + src = shell(src) + dst = shell(dst) + hsrc = host(src) + hdst = host(dst) + try: + shutil.copy(hsrc, hdst) + except OSError as why: + if windows: + if WindowsError is not None and isinstance(why, WindowsError): + pass + else: + raise error.general('copying tree (1): %s -> %s: %s' % (hsrc, hdst, str(why))) + def copy_tree(src, dst): + trace = False + hsrc = host(src) hdst = host(dst) - if os.path.exists(src) and os.path.isdir(src): + if exists(src): names = listdir(src) else: - names = [basename(src)] - src = dirname(src) - - if not os.path.isdir(dst): - os.makedirs(dst) + names = [] + + if trace: + print('path.copy_tree:') + print(' src: "%s"' % (src)) + print(' hsrc: "%s"' % (hsrc)) + print(' dst: "%s"' % (dst)) + print(' hdst: "%s"' % (hdst)) + print(' names: %r' % (names)) + + if not os.path.isdir(hdst): + if trace: + print(' mkdir: %s' % (hdst)) + try: + os.makedirs(hdst) + except OSError as why: + raise error.general('copying tree: cannot create target directory %s: %s' % \ + (hdst, str(why))) for name in names: - srcname = os.path.join(src, name) - dstname = os.path.join(dst, name) + srcname = host(os.path.join(hsrc, name)) + dstname = host(os.path.join(hdst, name)) try: if os.path.islink(srcname): linkto = os.readlink(srcname) - if os.path.exists(dstname): + if exists(shell(dstname)): if os.path.islink(dstname): dstlinkto = os.readlink(dstname) if linkto != dstlinkto: - log.warning('copying tree: update of link does not match: %s -> %s' % \ + log.warning('copying tree: link does not match: %s -> %s' % \ (dstname, dstlinkto)) os.remove(dstname) else: @@ -241,20 +358,74 @@ def copy_tree(src, dst): elif os.path.isdir(srcname): copy_tree(srcname, dstname) else: - shutil.copy2(srcname, dstname) + shutil.copyfile(host(srcname), host(dstname)) + shutil.copystat(host(srcname), host(dstname)) except shutil.Error as err: - raise error.general('copying tree: %s -> %s: %s' % (src, dst, str(err))) + raise error.general('copying tree (2): %s -> %s: %s' % \ + (hsrc, hdst, str(err))) except EnvironmentError as why: - raise error.general('copying tree: %s -> %s: %s' % (srcname, dstname, str(why))) + raise error.general('copying tree (3): %s -> %s: %s' % \ + (srcname, dstname, str(why))) try: - shutil.copystat(src, dst) + shutil.copystat(hsrc, hdst) except OSError as why: - ok = False if windows: if WindowsError is not None and isinstance(why, WindowsError): - ok = True - if not ok: - raise error.general('copying tree: %s -> %s: %s' % (src, dst, str(why))) + pass + else: + raise error.general('copying tree (4): %s -> %s: %s' % (hsrc, hdst, str(why))) + +def get_size(path, depth = -1): + # + # Get the size the directory tree manually to the required depth. + # This makes sure on Windows the files are correctly encoded to avoid + # the file name size limit. On Windows the os.walk fails once we + # get to the max path length on Windows. + # + def _isdir(path): + hpath = host(path) + return os.path.isdir(hpath) and not os.path.islink(hpath) + + def _node_size(path): + hpath = host(path) + size = 0 + if not os.path.islink(hpath): + size = os.path.getsize(hpath) + return size + + def _get_size(path, depth, level = 0): + level += 1 + dirs = [] + size = 0 + for name in listdir(path): + path_ = join(path, shell(name)) + hname = host(path_) + if _isdir(path_): + dirs += [shell(name)] + else: + size += _node_size(path_) + if depth < 0 or level < depth: + for name in dirs: + dir = join(path, name) + size += _get_size(dir, depth, level) + return size + + path = shell(path) + hpath = host(path) + size = 0 + + if os.path.exists(hpath): + size = _get_size(path, depth) + + return size + +def get_humanize_size(path, depth = -1): + size = get_size(path, depth) + for unit in ['','K','M','G','T','P','E','Z']: + if abs(size) < 1024.0: + return "%5.3f%sB" % (size, unit) + size /= 1024.0 + return "%.3f%sB" % (size, 'Y') if __name__ == '__main__': print(host('/a/b/c/d-e-f')) @@ -271,3 +442,4 @@ if __name__ == '__main__': print(basename('x:/sd/df/fg/me.txt')) print(dirname('x:/sd/df/fg/me.txt')) print(join('s:/d/', '/g', '/tyty/fgfg')) + print(join('s:/d/e\\f/g', '/h', '/tyty/zxzx', '\\mm\\nn/p')) -- cgit v1.2.3