Skip to content

Instantly share code, notes, and snippets.

@jean-helsinki
Created June 3, 2015 12:24
Show Gist options
  • Save jean-helsinki/85b3ea97b7445abdd9a7 to your computer and use it in GitHub Desktop.
Save jean-helsinki/85b3ea97b7445abdd9a7 to your computer and use it in GitHub Desktop.

Revisions

  1. jean-helsinki created this gist Jun 3, 2015.
    370 changes: 370 additions & 0 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,370 @@
    # -*- coding: utf-8 -*-
    """
    Based on:
    https://gist.github.com/meeuw/c3bc9dd07945c87c89e6#file-findfiles-py
    https://bitbucket.org/nosklo/pysmbclient/wiki/Home
    """

    import os
    import pexpect
    import re
    import locale
    import datetime
    import time
    import logging

    __logger__ = logging.getLogger(os.path.basename(__file__))

    try:
    datetime_strptime = datetime.datetime.strptime
    except AttributeError:
    # python version older than 2.5
    def datetime_strptime(date, format):
    return datetime.datetime(*(time.strptime(date, format)[:6]))

    _volume_re = re.compile(r"""
    Volume:\s # label
    \|([^|]*)\|\s # the volume name
    serial\snumber\s # another label
    0x([a-f0-9]+) # hex serial number
    $ # end of line
    """, re.VERBOSE)

    _smb_header_re = re.compile(r"""
    Domain=\[([^]]+)\]\s
    OS=\[([^]]+)\]\s
    Server=\[([^]]+)\]
    $
    """, re.VERBOSE)


    _file_re = re.compile(r"""
    \s* # file lines start with some spaces
    (.*?)\s+ # capture filename non-greedy, eating remaining spaces
    ([ADHSR]*) # capture file mode
    \s+ # after the mode you can have any number of spaces
    (\d+) # file size
    \s+ # spaces after file size
    ( # begin date capturing
    \w{3} # abbrev weekday
    \s # space
    \w{3} # abbrev month
    \s{1,2} # one or two spaces before the day
    \d{1,2} # day
    \s # a space before the time
    \d{2}:\d{2}:\d{2} # time
    \s # space
    \d{4} # year
    ) # end date capturing
    $ # end of string""", re.VERBOSE)

    _cwd_re = re.compile(ur"Current directory is \\\\(?P<host>[^\\]+)\\((?P<share>[^\\]+))(?P<rel_path>.+)")

    _SMB_PROMPT = ur'smb:\s.*\>'

    def _to_smb_path_fmt(path):
    return path.replace(os.sep, '\\')

    def _to_local_path_fmt(path):
    return path.replace('\\', os.sep)

    class SambaClientError(OSError): pass

    class SambaClient():
    def __init__(self, host, share, username, domain=".", password=""):
    smb_cmd = 'smbclient -W {domain} -U "{username}" //{host}/{share}/'.format(**locals())
    self.username = username
    self.domain = domain
    self.host = host
    self.share = share
    self.p = pexpect.spawn(smb_cmd, )
    self._put_pwd(password)
    self._runcmd_error_on_data("prompt ON")

    def _put_pwd(self, pwd):
    self.p.expect(ur'(?i).*password:')
    self.p.sendline(pwd)
    self.p.expect(_SMB_PROMPT)

    def _runcmd(self, command=None, *args):
    if command:
    f_args = " ".join('"%s"' % arg for arg in args)
    full_cmd = command + " " + f_args
    __logger__.debug("SMBCLIENT CMD: {0}".format(full_cmd))
    self.p.sendline(full_cmd)
    self.p.expect(_SMB_PROMPT)
    raw_out = self.p.before.decode('utf-8')
    return raw_out.replace(full_cmd, "", 1).strip()

    def _runcmd_error_on_data(self, cmd, *args):
    """raises SambaClientError if cmd returns any data"""
    def term_bs_in_out(s):
    return " \b" in s
    data = self._runcmd(cmd, *args)
    if data and not term_bs_in_out(data):
    raise SambaClientError("Error on %r: %r" % (cmd, data))
    return data

    def lsdir(self, path):
    """
    Lists a directory
    returns a list of tuples in the format:
    [(filename, modes, size, date), ...]
    """
    path = os.path.join(path, u'*')
    return self.glob(path)

    def glob(self, path):
    """
    Lists a glob (example: "/files/somefile.*")
    returns a list of tuples in the format:
    [(filename, modes, size, date), ...]
    """
    files = self._runcmd(u'ls', path).splitlines()
    for filedata in files:
    m = _file_re.match(filedata)
    if m:
    name, modes, size, date = m.groups()
    if name == '.' or name == '..':
    continue
    size = int(size)
    # Resets locale to "C" to parse english date properly
    # (non thread-safe code)
    loc = locale.getlocale(locale.LC_TIME)
    locale.setlocale(locale.LC_TIME, 'C')
    date = datetime_strptime(date, '%a %b %d %H:%M:%S %Y')
    locale.setlocale(locale.LC_TIME, loc)
    yield (name, modes, size, date)

    def listdir(self, path):
    """Emulates os.listdir()"""
    result = [f[0] for f in self.lsdir(path)]
    if not result: # can mean both that the dir is empty or not found
    # disambiguation: verifies if the path doesn't exist. Let the error
    # raised by _getfile propagate in that case.
    self._getfile(path)
    return result

    def _getfile(self, path):
    try:
    f = self.glob(path).next()
    except StopIteration:
    raise SambaClientError('Path not found: %r' % path)
    return f

    def info(self, path):
    """Fetches information about a file"""
    path = _to_smb_path_fmt(path)
    data = self._runcmd(u'allinfo', path)
    if data.startswith('ERRSRV'):
    raise SambaClientError(
    'Error retrieving info for %r: %r' % (path, data.strip()))
    result = {}
    for info in data.splitlines():
    k, sep, v = info.partition(':')
    if sep:
    result[k.strip()] = v.strip()
    return result

    def diskinfo(self):
    """Fetches information about a volume"""
    data = self._runcmd('volume')
    for line in data.splitlines():
    m = _volume_re.match(line)
    if m:
    name, serial = m.groups()
    return name, int(serial, 16)
    else:
    raise SambaClientError(
    'Error retrieving disk information: %r' % data)

    def volume(self):
    """Fetches the volume name"""
    return self.diskinfo()[0]

    def serial(self):
    """Fetches the volume serial"""
    return self.diskinfo()[1]

    def isdir(self, path):
    """Returns True if path is a directory/folder"""
    return 'D' in self._getfile(path)[1]

    def isfile(self, path):
    """Returns True if path is a regular file"""
    return not self.isdir(path)

    def exists(self, path):
    """Returns True if path exists in the remote host"""
    try:
    self._getfile(path)
    except SambaClientError:
    return False
    else:
    return True

    def mkdir(self, path):
    """Creates a new folder remotely"""
    path = _to_smb_path_fmt(path)
    self._runcmd_error_on_data(u'mkdir', path)

    def cd(self, path):
    """Change remote dir"""
    path = _to_smb_path_fmt(path)
    self._runcmd_error_on_data(u'cd', path)

    def lcd(self, path):
    """Change local dir"""
    self._runcmd_error_on_data(u'lcd', path)

    def rmdir(self, path):
    """Removes a remote empty folder"""
    path = _to_smb_path_fmt(path)
    self._runcmd_error_on_data(u'rmdir', path)

    def unlink(self, path):
    """Removes/deletes/unlinks a file or folder"""
    path = _to_smb_path_fmt(path)
    if self.isdir(path):
    try:
    self.set_recurse()
    self._runcmd_error_on_data(u'del', path + "\\*")
    self.rmdir(path)
    finally:
    self.set_recurse(False)
    else:
    self._runcmd_error_on_data(u'del', path)

    remove = unlink

    def chmod(self, path, *modes):
    """Set/reset file modes
    Tested with: AHS
    smbc.chmod('/file.txt', '+H')
    """
    path = _to_smb_path_fmt(path)
    plus_modes = []
    minus_modes = []
    for mode in modes:
    if mode.startswith(u'-'):
    minus_modes.append(mode.lstrip(u'-'))
    else:
    plus_modes.append(mode.lstrip(u'+'))
    modes = []
    if plus_modes:
    modes.append(u'+%s' % u''.join(plus_modes))
    if minus_modes:
    modes.append(u'-%s' % u''.join(minus_modes))
    self._runcmd_error_on_data(u'setmode', u''.join(modes))

    def rename(self, old_name, new_name):
    old_name = _to_smb_path_fmt(old_name)
    new_name = _to_smb_path_fmt(new_name)
    self._runcmd_error_on_data(u'rename', old_name, new_name)

    def download_file(self, remote_path, local_path):
    remote_path = _to_smb_path_fmt(remote_path)
    result = self._runcmd(u'get', remote_path, local_path)

    def upload_file(self, local_path, remote_path):
    remote_path = _to_smb_path_fmt(remote_path)
    result = self._runcmd(u'put', local_path, remote_path)

    def upload_update(self, local_path, remote_path):
    remote_path = _to_smb_path_fmt(remote_path)
    result = self._runcmd(u'reput', local_path, remote_path)

    def walk(self, top, topdown=True):
    names = self.glob(os.path.join(top, "*"))

    dirs, nondirs = [], []
    for item in names:
    if 'D' in item[1]:
    dirs.append(item[0])
    else:
    nondirs.append(item[0])
    if topdown:
    yield top, dirs, nondirs
    for name in dirs:
    new_path = os.path.join(top, name)
    for x in self.walk(new_path, topdown):
    yield x
    if not topdown:
    yield top, dirs, nondirs

    def upload(self, local_path, remote_path):
    if os.path.isdir(local_path):
    if not self.exists(remote_path):
    self.makedirs(remote_path)
    for root, dirs, files in os.walk(local_path):
    remote_root = root.replace(local_path, _to_local_path_fmt(remote_path))
    for d in dirs:
    rem_dir = _to_smb_path_fmt(os.path.join(remote_root, d))
    if not self.exists(rem_dir):
    self.makedirs(rem_dir)
    for f in files:
    rem_f = _to_smb_path_fmt(os.path.join(remote_root, f))
    self.upload_file(os.path.join(root, f), rem_f)
    else:
    basedir = os.path.dirname(remote_path)
    if not self.exists(basedir):
    self.makedirs(basedir)
    self.upload_file(local_path, remote_path)

    def download(self, remote_path, local_path):
    if self.isdir(remote_path):
    if not os.path.exists(local_path):
    os.makedirs(local_path)
    for root, dirs, files in self.walk(remote_path):
    local_root = root.replace(_to_local_path_fmt(remote_path), local_path)
    for d in dirs:
    loc_dir = os.path.join(local_root, d)
    if not os.path.exists(loc_dir):
    os.makedirs(loc_dir)
    for f in files:
    loc_f = os.path.join(local_root, f)
    self.download_file(_to_smb_path_fmt(os.path.join(root, f)), loc_f)
    else:
    basedir = os.path.dirname(local_path)
    if not os.path.exists(basedir):
    self.makedirs(basedir)
    self.download_file(remote_path, local_path)

    def get_cwd(self):
    out = self._runcmd(u'pwd').strip()
    m = _cwd_re.search(out)
    if m:
    return m.groupdict()["rel_path"]
    else:
    raise SambaClientError("pwd command fail")

    def makedirs(self, remote_path):
    remote_path = _to_smb_path_fmt(remote_path)
    dir_items = remote_path.split("\\")
    for i in xrange(len(dir_items)):
    part_path = "\\".join(dir_items[:i+1])
    if not self.exists(part_path):
    self.mkdir(part_path)

    def set_recurse(self, recurse=True):
    cwd = self.get_cwd()
    try:
    if cwd != "\\": self.cd("\\")
    flag = "ON" if recurse else "OFF"
    self._runcmd_error_on_data("recurse {0}".format(flag))
    finally:
    if cwd != "\\": self.cd(cwd)

    def __enter__(self):
    return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
    self.close()

    def __repr__(self):
    return '<SambaClient({self.domain}\\{self.username}@//{self.host}/{self.share})>'.format(self=self)

    def close(self):
    pass