summaryrefslogtreecommitdiff
path: root/.i3/scripts/libs
diff options
context:
space:
mode:
authorRené 'Necoro' Neumann <necoro@necoro.net>2013-06-06 17:35:45 +0200
committerRené 'Necoro' Neumann <necoro@necoro.net>2013-06-06 17:35:45 +0200
commit93ca7a4258e83996c0075ff5976658a12ffb6e02 (patch)
tree393833395c7c3237cd162fbbbad48c8cf5b5bc49 /.i3/scripts/libs
parent346139a617b044329efcc0a17fdbdf11cf31971e (diff)
downloaddotfiles-93ca7a4258e83996c0075ff5976658a12ffb6e02.tar.gz
dotfiles-93ca7a4258e83996c0075ff5976658a12ffb6e02.tar.bz2
dotfiles-93ca7a4258e83996c0075ff5976658a12ffb6e02.zip
i3: unify scripts into one
Diffstat (limited to '.i3/scripts/libs')
-rw-r--r--.i3/scripts/libs/i3.py559
-rw-r--r--.i3/scripts/libs/sh.py1695
2 files changed, 2254 insertions, 0 deletions
diff --git a/.i3/scripts/libs/i3.py b/.i3/scripts/libs/i3.py
new file mode 100644
index 0000000..343c709
--- /dev/null
+++ b/.i3/scripts/libs/i3.py
@@ -0,0 +1,559 @@
+#======================================================================
+# i3 (Python module for communicating with i3 window manager)
+# Copyright (C) 2012 Jure Ziberna
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#======================================================================
+
+
+import sys
+import subprocess
+import json
+import socket
+import struct
+import threading
+import time
+
+ModuleType = type(sys)
+
+
+__author__ = 'Jure Ziberna'
+__version__ = '0.6.5'
+__date__ = '2012-06-20'
+__license__ = 'GNU GPL 3'
+
+
+MSG_TYPES = [
+ 'command',
+ 'get_workspaces',
+ 'subscribe',
+ 'get_outputs',
+ 'get_tree',
+ 'get_marks',
+ 'get_bar_config',
+]
+
+EVENT_TYPES = [
+ 'workspace',
+ 'output',
+]
+
+
+class i3Exception(Exception):
+ pass
+
+class MessageTypeError(i3Exception):
+ """
+ Raised when message type isn't available. See i3.MSG_TYPES.
+ """
+ def __init__(self, type):
+ msg = "Message type '%s' isn't available" % type
+ super(MessageTypeError, self).__init__(msg)
+
+class EventTypeError(i3Exception):
+ """
+ Raised when even type isn't available. See i3.EVENT_TYPES.
+ """
+ def __init__(self, type):
+ msg = "Event type '%s' isn't available" % type
+ super(EventTypeError, self).__init__(msg)
+
+class MessageError(i3Exception):
+ """
+ Raised when a message to i3 is unsuccessful.
+ That is, when it contains 'success': false in its JSON formatted response.
+ """
+ pass
+
+class ConnectionError(i3Exception):
+ """
+ Raised when a socket couldn't connect to the window manager.
+ """
+ def __init__(self, socket_path):
+ msg = "Could not connect to socket at '%s'" % socket_path
+ super(ConnectionError, self).__init__(msg)
+
+
+def parse_msg_type(msg_type):
+ """
+ Returns an i3-ipc code of the message type. Raises an exception if
+ the given message type isn't available.
+ """
+ try:
+ index = int(msg_type)
+ except ValueError:
+ index = -1
+ if index >= 0 and index < len(MSG_TYPES):
+ return index
+ msg_type = str(msg_type).lower()
+ if msg_type in MSG_TYPES:
+ return MSG_TYPES.index(msg_type)
+ else:
+ raise MessageTypeError(msg_type)
+
+def parse_event_type(event_type):
+ """
+ Returns an i3-ipc string of the event_type. Raises an exception if
+ the given event type isn't available.
+ """
+ try:
+ index = int(event_type)
+ except ValueError:
+ index = -1
+ if index >= 0 and index < len(EVENT_TYPES):
+ return EVENT_TYPES[index]
+ event_type = str(event_type).lower()
+ if event_type in EVENT_TYPES:
+ return event_type
+ else:
+ raise EventTypeError(event_type)
+
+
+class Socket(object):
+ """
+ Socket for communicating with the i3 window manager.
+ Optional arguments:
+ - path of the i3 socket. Path is retrieved from i3-wm itself via
+ "i3.get_socket_path()" if not provided.
+ - timeout in seconds
+ - chunk_size in bytes
+ - magic_string as a safety string for i3-ipc. Set to 'i3-ipc' by default.
+ """
+ magic_string = 'i3-ipc' # safety string for i3-ipc
+ chunk_size = 1024 # in bytes
+ timeout = 0.5 # in seconds
+ buffer = b'' # byte string
+
+ def __init__(self, path=None, timeout=None, chunk_size=None,
+ magic_string=None):
+ if not path:
+ path = get_socket_path()
+ self.path = path
+ if timeout:
+ self.timeout = timeout
+ if chunk_size:
+ self.chunk_size = chunk_size
+ if magic_string:
+ self.magic_string = magic_string
+ # Socket initialization and connection
+ self.initialize()
+ self.connect()
+ # Struct format initialization, length of magic string is in bytes
+ self.struct_header = '<%dsII' % len(self.magic_string.encode('utf-8'))
+ self.struct_header_size = struct.calcsize(self.struct_header)
+
+ def initialize(self):
+ """
+ Initializes the socket.
+ """
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.settimeout(self.timeout)
+
+ def connect(self, path=None):
+ """
+ Connects the socket to socket path if not already connected.
+ """
+ if not self.connected:
+ self.initialize()
+ if not path:
+ path = self.path
+ try:
+ self.socket.connect(path)
+ except socket.error:
+ raise ConnectionError(path)
+
+ def get(self, msg_type, payload=''):
+ """
+ Convenience method, calls "socket.send(msg_type, payload)" and
+ returns data from "socket.receive()".
+ """
+ self.send(msg_type, payload)
+ return self.receive()
+
+ def subscribe(self, event_type, event=None):
+ """
+ Subscribes to an event. Returns data on first occurrence.
+ """
+ event_type = parse_event_type(event_type)
+ # Create JSON payload from given event type and event
+ payload = [event_type]
+ if event:
+ payload.append(event)
+ payload = json.dumps(payload)
+ return self.get('subscribe', payload)
+
+ def send(self, msg_type, payload=''):
+ """
+ Sends the given message type with given message by packing them
+ and continuously sending bytes from the packed message.
+ """
+ message = self.pack(msg_type, payload)
+ # Continuously send the bytes from the message
+ self.socket.sendall(message)
+
+ def receive(self):
+ """
+ Tries to receive a data. Unpacks the received byte string if
+ successful. Returns None on failure.
+ """
+ try:
+ data = self.socket.recv(self.chunk_size)
+ msg_magic, msg_length, msg_type = self.unpack_header(data)
+ msg_size = self.struct_header_size + msg_length
+ # Keep receiving data until the whole message gets through
+ while len(data) < msg_size:
+ data += self.socket.recv(msg_length)
+ data = self.buffer + data
+ return self.unpack(data)
+ except socket.timeout:
+ return None
+
+ def pack(self, msg_type, payload):
+ """
+ Packs the given message type and payload. Turns the resulting
+ message into a byte string.
+ """
+ msg_magic = self.magic_string
+ # Get the byte count instead of number of characters
+ msg_length = len(payload.encode('utf-8'))
+ msg_type = parse_msg_type(msg_type)
+ # "struct.pack" returns byte string, decoding it for concatenation
+ msg_length = struct.pack('I', msg_length).decode('utf-8')
+ msg_type = struct.pack('I', msg_type).decode('utf-8')
+ message = '%s%s%s%s' % (msg_magic, msg_length, msg_type, payload)
+ # Encoding the message back to byte string
+ return message.encode('utf-8')
+
+ def unpack(self, data):
+ """
+ Unpacks the given byte string and parses the result from JSON.
+ Returns None on failure and saves data into "self.buffer".
+ """
+ data_size = len(data)
+ msg_magic, msg_length, msg_type = self.unpack_header(data)
+ msg_size = self.struct_header_size + msg_length
+ # Message shouldn't be any longer than the data
+ if data_size >= msg_size:
+ payload = data[self.struct_header_size:msg_size].decode('utf-8')
+ payload = json.loads(payload)
+ self.buffer = data[msg_size:]
+ return payload
+ else:
+ self.buffer = data
+ return None
+
+ def unpack_header(self, data):
+ """
+ Unpacks the header of given byte string.
+ """
+ return struct.unpack(self.struct_header, data[:self.struct_header_size])
+
+ @property
+ def connected(self):
+ """
+ Returns True if connected and False if not.
+ """
+ try:
+ self.get('command')
+ return True
+ except socket.error:
+ return False
+
+ def close(self):
+ """
+ Closes the socket connection.
+ """
+ self.socket.close()
+
+
+class Subscription(threading.Thread):
+ """
+ Creates a new subscription and runs a listener loop. Calls the
+ callback on event.
+ Example parameters:
+ callback = lambda event, data, subscription: print(data)
+ event_type = 'workspace'
+ event = 'focus'
+ event_socket = <i3.Socket object>
+ data_socket = <i3.Socket object>
+ """
+ subscribed = False
+ type_translation = {
+ 'workspace': 'get_workspaces',
+ 'output': 'get_outputs'
+ }
+
+ def __init__(self, callback, event_type, event=None, event_socket=None,
+ data_socket=None):
+ # Variable initialization
+ if not callable(callback):
+ raise TypeError('Callback must be callable')
+ event_type = parse_event_type(event_type)
+ self.callback = callback
+ self.event_type = event_type
+ self.event = event
+ # Socket initialization
+ if not event_socket:
+ event_socket = Socket()
+ self.event_socket = event_socket
+ self.event_socket.subscribe(event_type, event)
+ if not data_socket:
+ data_socket = Socket()
+ self.data_socket = data_socket
+ # Thread initialization
+ threading.Thread.__init__(self)
+ self.start()
+
+ def run(self):
+ """
+ Wrapper method for the listen method -- handles exceptions.
+ The method is run by the underlying "threading.Thread" object.
+ """
+ try:
+ self.listen()
+ except socket.error:
+ self.close()
+
+ def listen(self):
+ """
+ Runs a listener loop until self.subscribed is set to False.
+ Calls the given callback method with data and the object itself.
+ If event matches the given one, then matching data is retrieved.
+ Otherwise, the event itself is sent to the callback.
+ In that case 'change' key contains the thing that was changed.
+ """
+ self.subscribed = True
+ while self.subscribed:
+ event = self.event_socket.receive()
+ if not event: # skip an iteration if event is None
+ continue
+ if not self.event or ('change' in event and event['change'] == self.event):
+ msg_type = self.type_translation[self.event_type]
+ data = self.data_socket.get(msg_type)
+ else:
+ data = None
+ self.callback(event, data, self)
+ self.close()
+
+ def close(self):
+ """
+ Ends subscription loop by setting self.subscribed to False and
+ closing both sockets.
+ """
+ self.subscribed = False
+ self.event_socket.close()
+ if self.data_socket is not default_socket():
+ self.data_socket.close()
+
+
+def __call_cmd__(cmd):
+ """
+ Returns output (stdout or stderr) of the given command args.
+ """
+ try:
+ output = subprocess.check_output(cmd)
+ except subprocess.CalledProcessError as error:
+ output = error.output
+ output = output.decode('utf-8') # byte string decoding
+ return output.strip()
+
+
+__socket__ = None
+def default_socket(socket=None):
+ """
+ Returns i3.Socket object, which was initiliazed once with default values
+ if no argument is given.
+ Otherwise sets the default socket to the given socket.
+ """
+ global __socket__
+ if socket and isinstance(socket, Socket):
+ __socket__ = socket
+ elif not __socket__:
+ __socket__ = Socket()
+ return __socket__
+
+
+def msg(type, message=''):
+ """
+ Takes a message type and a message itself.
+ Talks to the i3 via socket and returns the response from the socket.
+ """
+ response = default_socket().get(type, message)
+ return response
+
+
+def __function__(type, message='', *args, **crit):
+ """
+ Accepts a message type, a message. Takes optional args and keyword
+ args which are present in all future calls of the resulting function.
+ Returns a function, which takes arguments and container criteria.
+ If message type was 'command', the function returns success value.
+ """
+ def function(*args2, **crit2):
+ msg_full = ' '.join([message] + list(args) + list(args2))
+ criteria = dict(crit)
+ criteria.update(crit2)
+ if criteria:
+ msg_full = '%s %s' % (container(**criteria), msg_full)
+ response = msg(type, msg_full)
+ response = success(response)
+ if isinstance(response, i3Exception):
+ raise response
+ return response
+ function.__name__ = type
+ function.__doc__ = 'Message sender (type: %s, message: %s)' % (type, message)
+ return function
+
+
+def subscribe(event_type, event=None, callback=None):
+ """
+ Accepts an event_type and event itself.
+ Creates a new subscription, prints data on every event until
+ KeyboardInterrupt is raised.
+ """
+ if not callback:
+ def callback(event, data, subscription):
+ print('changed:', event['change'])
+ if data:
+ print('data:\n', data)
+
+ socket = default_socket()
+ subscription = Subscription(callback, event_type, event, data_socket=socket)
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ print('') # force newline
+ finally:
+ subscription.close()
+
+
+def get_socket_path():
+ """
+ Gets the socket path via i3 command.
+ """
+ cmd = ['i3', '--get-socketpath']
+ output = __call_cmd__(cmd)
+ return output
+
+
+def success(response):
+ """
+ Convenience method for filtering success values of a response.
+ Each success dictionary is replaces with boolean value.
+ i3.MessageError is returned if error key is found in any of the
+ success dictionaries.
+ """
+ if isinstance(response, dict) and 'success' in response:
+ if 'error' in response:
+ return MessageError(response['error'])
+ return response['success']
+ elif isinstance(response, list):
+ for index, item in enumerate(response):
+ item = success(item)
+ if isinstance(item, i3Exception):
+ return item
+ response[index] = item
+ return response
+
+
+def container(**criteria):
+ """
+ Turns keyword arguments into a formatted container criteria.
+ """
+ if 'klass' in criteria:
+ criteria['class'] = criteria['klass']
+ del criteria['klass']
+
+ criteria = ['%s="%s"' % (key, val) for key, val in criteria.items()]
+ return '[%s]' % ' '.join(criteria)
+
+
+def parent(con_id, tree=None):
+ """
+ Searches for a parent of a node/container, given the container id.
+ Returns None if no container with given id exists (or if the
+ container is already a root node).
+ """
+ def has_child(node):
+ for child in node['nodes']:
+ if child['id'] == con_id:
+ return True
+ return False
+ parents = filter(tree, has_child)
+ if not parents or len(parents) > 1:
+ return None
+ return parents[0]
+
+
+def filter(tree=None, function=None, **conditions):
+ """
+ Filters a tree based on given conditions. For example, to get a list of
+ unfocused windows (leaf nodes) in the current tree:
+ i3.filter(nodes=[], focused=False)
+ The return value is always a list of matched items, even if there's
+ only one item that matches.
+ The user function should take a single node. The function doesn't have
+ to do any dict key or index checking (this is handled by i3.filter
+ internally).
+ """
+ if tree is None:
+ tree = msg('get_tree')
+ elif isinstance(tree, list):
+ tree = {'list': tree}
+ if function:
+ try:
+ if function(tree):
+ return [tree]
+ except (KeyError, IndexError):
+ pass
+ else:
+ for key, value in conditions.items():
+ if key not in tree or tree[key] != value:
+ break
+ else:
+ return [tree]
+ matches = []
+ for nodes in ['nodes', 'floating_nodes', 'list']:
+ if nodes in tree:
+ for node in tree[nodes]:
+ matches += filter(node, function, **conditions)
+ return matches
+
+
+class i3(ModuleType):
+ """
+ i3.py is a Python module for communicating with the i3 window manager.
+ """
+ def __init__(self, module):
+ self.__module__ = module
+ self.__name__ = module.__name__
+
+ def __getattr__(self, name):
+ """
+ Turns a nonexistent attribute into a function.
+ Returns the resulting function.
+ """
+ try:
+ return getattr(self.__module__, name)
+ except AttributeError:
+ pass
+ if name.lower() in self.__module__.MSG_TYPES:
+ return self.__module__.__function__(type=name)
+ else:
+ return self.__module__.__function__(type='command', message=name)
+
+
+# Turn the module into an i3 object
+sys.modules[__name__] = i3(sys.modules[__name__])
diff --git a/.i3/scripts/libs/sh.py b/.i3/scripts/libs/sh.py
new file mode 100644
index 0000000..0e46f14
--- /dev/null
+++ b/.i3/scripts/libs/sh.py
@@ -0,0 +1,1695 @@
+#===============================================================================
+# Copyright (C) 2011-2012 by Andrew Moffat
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#===============================================================================
+
+
+__version__ = "1.08"
+__project_url__ = "https://github.com/amoffat/sh"
+
+
+
+import platform
+
+if "windows" in platform.system().lower():
+ raise ImportError("sh %s is currently only supported on linux and osx. \
+please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \
+support." % __version__)
+
+
+
+import sys
+IS_PY3 = sys.version_info[0] == 3
+
+import traceback
+import os
+import re
+from glob import glob as original_glob
+from types import ModuleType
+from functools import partial
+import inspect
+import time as _time
+
+from locale import getpreferredencoding
+DEFAULT_ENCODING = getpreferredencoding() or "utf-8"
+
+
+if IS_PY3:
+ from io import StringIO
+ from io import BytesIO as cStringIO
+ from queue import Queue, Empty
+else:
+ from StringIO import StringIO
+ from cStringIO import OutputType as cStringIO
+ from Queue import Queue, Empty
+
+IS_OSX = platform.system() == "Darwin"
+THIS_DIR = os.path.dirname(os.path.realpath(__file__))
+
+
+import errno
+import warnings
+
+import pty
+import termios
+import signal
+import gc
+import select
+import atexit
+import threading
+import tty
+import fcntl
+import struct
+import resource
+from collections import deque
+import logging
+import weakref
+
+
+logging_enabled = False
+
+
+if IS_PY3:
+ raw_input = input
+ unicode = str
+ basestring = str
+
+
+
+
+class ErrorReturnCode(Exception):
+ truncate_cap = 750
+
+ def __init__(self, full_cmd, stdout, stderr):
+ self.full_cmd = full_cmd
+ self.stdout = stdout
+ self.stderr = stderr
+
+
+ if self.stdout is None: tstdout = "<redirected>"
+ else:
+ tstdout = self.stdout[:self.truncate_cap]
+ out_delta = len(self.stdout) - len(tstdout)
+ if out_delta:
+ tstdout += ("... (%d more, please see e.stdout)" % out_delta).encode()
+
+ if self.stderr is None: tstderr = "<redirected>"
+ else:
+ tstderr = self.stderr[:self.truncate_cap]
+ err_delta = len(self.stderr) - len(tstderr)
+ if err_delta:
+ tstderr += ("... (%d more, please see e.stderr)" % err_delta).encode()
+
+ msg = "\n\n RAN: %r\n\n STDOUT:\n%s\n\n STDERR:\n%s" %\
+ (full_cmd, tstdout.decode(DEFAULT_ENCODING), tstderr.decode(DEFAULT_ENCODING))
+ super(ErrorReturnCode, self).__init__(msg)
+
+
+class SignalException(ErrorReturnCode): pass
+
+SIGNALS_THAT_SHOULD_THROW_EXCEPTION = (
+ signal.SIGKILL,
+ signal.SIGSEGV,
+ signal.SIGTERM,
+ signal.SIGINT,
+ signal.SIGQUIT
+)
+
+
+# we subclass AttributeError because:
+# https://github.com/ipython/ipython/issues/2577
+# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
+class CommandNotFound(AttributeError): pass
+
+rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_(\d+)")
+rc_exc_cache = {}
+
+def get_rc_exc(rc):
+ rc = int(rc)
+ try: return rc_exc_cache[rc]
+ except KeyError: pass
+
+ if rc > 0:
+ name = "ErrorReturnCode_%d" % rc
+ exc = type(name, (ErrorReturnCode,), {})
+ else:
+ name = "SignalException_%d" % abs(rc)
+ exc = type(name, (SignalException,), {})
+
+ rc_exc_cache[rc] = exc
+ return exc
+
+
+
+
+def which(program):
+ def is_exe(fpath):
+ return os.path.exists(fpath) and os.access(fpath, os.X_OK)
+
+ fpath, fname = os.path.split(program)
+ if fpath:
+ if is_exe(program): return program
+ else:
+ if "PATH" not in os.environ: return None
+ for path in os.environ["PATH"].split(os.pathsep):
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file):
+ return exe_file
+
+ return None
+
+def resolve_program(program):
+ path = which(program)
+ if not path:
+ # our actual command might have a dash in it, but we can't call
+ # that from python (we have to use underscores), so we'll check
+ # if a dash version of our underscore command exists and use that
+ # if it does
+ if "_" in program: path = which(program.replace("_", "-"))
+ if not path: return None
+ return path
+
+
+# we add this thin wrapper to glob.glob because of a specific edge case where
+# glob does not expand to anything. for example, if you try to do
+# glob.glob("*.py") and there are no *.py files in the directory, glob.glob
+# returns an empty list. this empty list gets passed to the command, and
+# then the command fails with a misleading error message. this thin wrapper
+# ensures that if there is no expansion, we pass in the original argument,
+# so that when the command fails, the error message is clearer
+def glob(arg):
+ return original_glob(arg) or arg
+
+
+
+class Logger(object):
+ def __init__(self, name, context=None):
+ self.name = name
+ self.context = "%s"
+ if context: self.context = "%s: %%s" % context
+ self.log = logging.getLogger(name)
+
+ def info(self, msg, *args):
+ if not logging_enabled: return
+ self.log.info(self.context, msg % args)
+
+ def debug(self, msg, *args):
+ if not logging_enabled: return
+ self.log.debug(self.context, msg % args)
+
+ def error(self, msg, *args):
+ if not logging_enabled: return
+ self.log.error(self.context, msg % args)
+
+ def exception(self, msg, *args):
+ if not logging_enabled: return
+ self.log.exception(self.context, msg % args)
+
+
+
+class RunningCommand(object):
+ def __init__(self, cmd, call_args, stdin, stdout, stderr):
+ truncate = 20
+ if len(cmd) > truncate:
+ logger_str = "command %r...(%d more) call_args %r" % \
+ (cmd[:truncate], len(cmd) - truncate, call_args)
+ else:
+ logger_str = "command %r call_args %r" % (cmd, call_args)
+
+ self.log = Logger("command", logger_str)
+ self.call_args = call_args
+ self.cmd = cmd
+ self.ran = " ".join(cmd)
+ self.process = None
+
+ # this flag is for whether or not we've handled the exit code (like
+ # by raising an exception). this is necessary because .wait() is called
+ # from multiple places, and wait() triggers the exit code to be
+ # processed. but we don't want to raise multiple exceptions, only
+ # one (if any at all)
+ self._handled_exit_code = False
+
+ self.should_wait = True
+ spawn_process = True
+
+
+ # with contexts shouldn't run at all yet, they prepend
+ # to every command in the context
+ if call_args["with"]:
+ spawn_process = False
+ Command._prepend_stack.append(self)
+
+
+ if callable(call_args["out"]) or callable(call_args["err"]):
+ self.should_wait = False
+
+ if call_args["piped"] or call_args["iter"] or call_args["iter_noblock"]:
+ self.should_wait = False
+
+ # we're running in the background, return self and let us lazily
+ # evaluate
+ if call_args["bg"]: self.should_wait = False
+
+ # redirection
+ if call_args["err_to_out"]: stderr = STDOUT
+
+
+ # set up which stream should write to the pipe
+ # TODO, make pipe None by default and limit the size of the Queue
+ # in oproc.OProc
+ pipe = STDOUT
+ if call_args["iter"] == "out" or call_args["iter"] is True: pipe = STDOUT
+ elif call_args["iter"] == "err": pipe = STDERR
+
+ if call_args["iter_noblock"] == "out" or call_args["iter_noblock"] is True: pipe = STDOUT
+ elif call_args["iter_noblock"] == "err": pipe = STDERR
+
+
+ if spawn_process:
+ self.log.debug("starting process")
+ self.process = OProc(cmd, stdin, stdout, stderr,
+ self.call_args, pipe=pipe)
+
+ if self.should_wait:
+ self.wait()
+
+
+ def wait(self):
+ self._handle_exit_code(self.process.wait())
+ return self
+
+ # here we determine if we had an exception, or an error code that we weren't
+ # expecting to see. if we did, we create and raise an exception
+ def _handle_exit_code(self, code):
+ if self._handled_exit_code: return
+ self._handled_exit_code = True
+
+ if code not in self.call_args["ok_code"] and \
+ (code > 0 or -code in SIGNALS_THAT_SHOULD_THROW_EXCEPTION):
+ raise get_rc_exc(code)(
+ " ".join(self.cmd),
+ self.process.stdout,
+ self.process.stderr
+ )
+
+
+
+ @property
+ def stdout(self):
+ self.wait()
+ return self.process.stdout
+
+ @property
+ def stderr(self):
+ self.wait()
+ return self.process.stderr
+
+ @property
+ def exit_code(self):
+ self.wait()
+ return self.process.exit_code
+
+ @property
+ def pid(self):
+ return self.process.pid
+
+ def __len__(self):
+ return len(str(self))
+
+ def __enter__(self):
+ # we don't actually do anything here because anything that should
+ # have been done would have been done in the Command.__call__ call.
+ # essentially all that has to happen is the comand be pushed on
+ # the prepend stack.
+ pass
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ # we do this because if get blocks, we can't catch a KeyboardInterrupt
+ # so the slight timeout allows for that.
+ while True:
+ try: chunk = self.process._pipe_queue.get(False, .001)
+ except Empty:
+ if self.call_args["iter_noblock"]: return errno.EWOULDBLOCK
+ else:
+ if chunk is None:
+ self.wait()
+ raise StopIteration()
+ try: return chunk.decode(self.call_args["encoding"],
+ self.call_args["decode_errors"])
+ except UnicodeDecodeError: return chunk
+
+ # python 3
+ __next__ = next
+
+ def __exit__(self, typ, value, traceback):
+ if self.call_args["with"] and Command._prepend_stack:
+ Command._prepend_stack.pop()
+
+ def __str__(self):
+ if IS_PY3: return self.__unicode__()
+ else: return unicode(self).encode(self.call_args["encoding"])
+
+ def __unicode__(self):
+ if self.process and self.stdout:
+ return self.stdout.decode(self.call_args["encoding"],
+ self.call_args["decode_errors"])
+ return ""
+
+ def __eq__(self, other):
+ return unicode(self) == unicode(other)
+
+ def __contains__(self, item):
+ return item in str(self)
+
+ def __getattr__(self, p):
+ # let these three attributes pass through to the OProc object
+ if p in ("signal", "terminate", "kill"):
+ if self.process: return getattr(self.process, p)
+ else: raise AttributeError
+ return getattr(unicode(self), p)
+
+ def __repr__(self):
+ try: return str(self)
+ except UnicodeDecodeError:
+ if self.process:
+ if self.stdout: return repr(self.stdout)
+ return repr("")
+
+ def __long__(self):
+ return long(str(self).strip())
+
+ def __float__(self):
+ return float(str(self).strip())
+
+ def __int__(self):
+ return int(str(self).strip())
+
+
+
+
+
+class Command(object):
+ _prepend_stack = []
+
+ _call_args = {
+ # currently unsupported
+ #"fg": False, # run command in foreground
+
+ "bg": False, # run command in background
+ "with": False, # prepend the command to every command after it
+ "in": None,
+ "out": None, # redirect STDOUT
+ "err": None, # redirect STDERR
+ "err_to_out": None, # redirect STDERR to STDOUT
+
+ # stdin buffer size
+ # 1 for line, 0 for unbuffered, any other number for that amount
+ "in_bufsize": 0,
+ # stdout buffer size, same values as above
+ "out_bufsize": 1,
+ "err_bufsize": 1,
+
+ # this is how big the output buffers will be for stdout and stderr.
+ # this is essentially how much output they will store from the process.
+ # we use a deque, so if it overflows past this amount, the first items
+ # get pushed off as each new item gets added.
+ #
+ # NOTICE
+ # this is not a *BYTE* size, this is a *CHUNK* size...meaning, that if
+ # you're buffering out/err at 1024 bytes, the internal buffer size will
+ # be "internal_bufsize" CHUNKS of 1024 bytes
+ "internal_bufsize": 3 * 1024**2,
+
+ "env": None,
+ "piped": None,
+ "iter": None,
+ "iter_noblock": None,
+ "ok_code": 0,
+ "cwd": None,
+ "long_sep": "=",
+
+ # this is for programs that expect their input to be from a terminal.
+ # ssh is one of those programs
+ "tty_in": False,
+ "tty_out": True,
+
+ "encoding": DEFAULT_ENCODING,
+ "decode_errors": "strict",
+
+ # how long the process should run before it is auto-killed
+ "timeout": 0,
+
+ # these control whether or not stdout/err will get aggregated together
+ # as the process runs. this has memory usage implications, so sometimes
+ # with long-running processes with a lot of data, it makes sense to
+ # set these to true
+ "no_out": False,
+ "no_err": False,
+ "no_pipe": False,
+
+ # if any redirection is used for stdout or stderr, internal buffering
+ # of that data is not stored. this forces it to be stored, as if
+ # the output is being T'd to both the redirected destination and our
+ # internal buffers
+ "tee": None,
+ }
+
+ # these are arguments that cannot be called together, because they wouldn't
+ # make any sense
+ _incompatible_call_args = (
+ #("fg", "bg", "Command can't be run in the foreground and background"),
+ ("err", "err_to_out", "Stderr is already being redirected"),
+ ("piped", "iter", "You cannot iterate when this command is being piped"),
+ )
+
+
+ # this method exists because of the need to have some way of letting
+ # manual object instantiation not perform the underscore-to-dash command
+ # conversion that resolve_program uses.
+ #
+ # there are 2 ways to create a Command object. using sh.Command(<program>)
+ # or by using sh.<program>. the method fed into sh.Command must be taken
+ # literally, and so no underscore-dash conversion is performed. the one
+ # for sh.<program> must do the underscore-dash converesion, because we
+ # can't type dashes in method names
+ @classmethod
+ def _create(cls, program, **default_kwargs):
+ path = resolve_program(program)
+ if not path: raise CommandNotFound(program)
+
+ cmd = cls(path)
+ if default_kwargs: cmd = cmd.bake(**default_kwargs)
+
+ return cmd
+
+
+ def __init__(self, path):
+ path = which(path)
+ if not path: raise CommandNotFound(path)
+ self._path = path
+
+ self._partial = False
+ self._partial_baked_args = []
+ self._partial_call_args = {}
+
+ # bugfix for functools.wraps. issue #121
+ self.__name__ = repr(self)
+
+
+ def __getattribute__(self, name):
+ # convenience
+ getattr = partial(object.__getattribute__, self)
+
+ if name.startswith("_"): return getattr(name)
+ if name == "bake": return getattr("bake")
+ if name.endswith("_"): name = name[:-1]
+
+ return getattr("bake")(name)
+
+
+ @staticmethod
+ def _extract_call_args(kwargs, to_override={}):
+ kwargs = kwargs.copy()
+ call_args = {}
+ for parg, default in Command._call_args.items():
+ key = "_" + parg
+
+ if key in kwargs:
+ call_args[parg] = kwargs[key]
+ del kwargs[key]
+ elif parg in to_override:
+ call_args[parg] = to_override[parg]
+
+ # test for incompatible call args
+ s1 = set(call_args.keys())
+ for args in Command._incompatible_call_args:
+ args = list(args)
+ error = args.pop()
+
+ if s1.issuperset(args):
+ raise TypeError("Invalid special arguments %r: %s" % (args, error))
+
+ return call_args, kwargs
+
+
+ # this helper method is for normalizing an argument into a string in the
+ # system's default encoding. we can feed it a number or a string or
+ # whatever
+ def _format_arg(self, arg):
+ if IS_PY3: arg = str(arg)
+ else:
+ # if the argument is already unicode, or a number or whatever,
+ # this first call will fail.
+ try: arg = unicode(arg, DEFAULT_ENCODING).encode(DEFAULT_ENCODING)
+ except TypeError: arg = unicode(arg).encode(DEFAULT_ENCODING)
+ return arg
+
+
+ def _aggregate_keywords(self, keywords, sep, raw=False):
+ processed = []
+ for k, v in keywords.items():
+ # we're passing a short arg as a kwarg, example:
+ # cut(d="\t")
+ if len(k) == 1:
+ if v is not False:
+ processed.append("-" + k)
+ if v is not True:
+ processed.append(self._format_arg(v))
+
+ # we're doing a long arg
+ else:
+ if not raw: k = k.replace("_", "-")
+
+ if v is True:
+ processed.append("--" + k)
+ elif v is False:
+ pass
+ else:
+ processed.append("--%s%s%s" % (k, sep, self._format_arg(v)))
+ return processed
+
+
+ def _compile_args(self, args, kwargs, sep):
+ processed_args = []
+
+ # aggregate positional args
+ for arg in args:
+ if isinstance(arg, (list, tuple)):
+ if not arg:
+ warnings.warn("Empty list passed as an argument to %r. \
+If you're using glob.glob(), please use sh.glob() instead." % self.path, stacklevel=3)
+ for sub_arg in arg: processed_args.append(self._format_arg(sub_arg))
+ elif isinstance(arg, dict):
+ processed_args += self._aggregate_keywords(arg, sep, raw=True)
+ else:
+ processed_args.append(self._format_arg(arg))
+
+ # aggregate the keyword arguments
+ processed_args += self._aggregate_keywords(kwargs, sep)
+
+ return processed_args
+
+
+ # TODO needs documentation
+ def bake(self, *args, **kwargs):
+ fn = Command(self._path)
+ fn._partial = True
+
+ call_args, kwargs = self._extract_call_args(kwargs)
+
+ pruned_call_args = call_args
+ for k,v in Command._call_args.items():
+ try:
+ if pruned_call_args[k] == v:
+ del pruned_call_args[k]
+ except KeyError: continue
+
+ fn._partial_call_args.update(self._partial_call_args)
+ fn._partial_call_args.update(pruned_call_args)
+ fn._partial_baked_args.extend(self._partial_baked_args)
+ sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
+ fn._partial_baked_args.extend(self._compile_args(args, kwargs, sep))
+ return fn
+
+ def __str__(self):
+ if IS_PY3: return self.__unicode__()
+ else: return unicode(self).encode(DEFAULT_ENCODING)
+
+ def __eq__(self, other):
+ try: return str(self) == str(other)
+ except: return False
+
+ def __repr__(self):
+ return "<Command %r>" % str(self)
+
+ def __unicode__(self):
+ baked_args = " ".join(self._partial_baked_args)
+ if baked_args: baked_args = " " + baked_args
+ return self._path + baked_args
+
+ def __enter__(self):
+ self(_with=True)
+
+ def __exit__(self, typ, value, traceback):
+ Command._prepend_stack.pop()
+
+
+ def __call__(self, *args, **kwargs):
+ kwargs = kwargs.copy()
+ args = list(args)
+
+ cmd = []
+
+ # aggregate any 'with' contexts
+ call_args = Command._call_args.copy()
+ for prepend in self._prepend_stack:
+ # don't pass the 'with' call arg
+ pcall_args = prepend.call_args.copy()
+ try: del pcall_args["with"]
+ except: pass
+
+ call_args.update(pcall_args)
+ cmd.extend(prepend.cmd)
+
+ cmd.append(self._path)
+
+ # here we extract the special kwargs and override any
+ # special kwargs from the possibly baked command
+ tmp_call_args, kwargs = self._extract_call_args(kwargs, self._partial_call_args)
+ call_args.update(tmp_call_args)
+
+ if not isinstance(call_args["ok_code"], (tuple, list)):
+ call_args["ok_code"] = [call_args["ok_code"]]
+
+
+ # check if we're piping via composition
+ stdin = call_args["in"]
+ if args:
+ first_arg = args.pop(0)
+ if isinstance(first_arg, RunningCommand):
+ # it makes sense that if the input pipe of a command is running
+ # in the background, then this command should run in the
+ # background as well
+ if first_arg.call_args["bg"]: call_args["bg"] = True
+ stdin = first_arg.process._pipe_queue
+
+ else:
+ args.insert(0, first_arg)
+
+ processed_args = self._compile_args(args, kwargs, call_args["long_sep"])
+
+ # makes sure our arguments are broken up correctly
+ split_args = self._partial_baked_args + processed_args
+
+ final_args = split_args
+
+ cmd.extend(final_args)
+
+
+ # stdout redirection
+ stdout = call_args["out"]
+ if stdout \
+ and not callable(stdout) \
+ and not hasattr(stdout, "write") \
+ and not isinstance(stdout, (cStringIO, StringIO)):
+
+ stdout = open(str(stdout), "wb")
+
+
+ # stderr redirection
+ stderr = call_args["err"]
+ if stderr and not callable(stderr) and not hasattr(stderr, "write") \
+ and not isinstance(stderr, (cStringIO, StringIO)):
+ stderr = open(str(stderr), "wb")
+
+
+ return RunningCommand(cmd, call_args, stdin, stdout, stderr)
+
+
+
+
+# used in redirecting
+STDOUT = -1
+STDERR = -2
+
+
+
+# Process open = Popen
+# Open Process = OProc
+class OProc(object):
+ _procs_to_cleanup = set()
+ _registered_cleanup = False
+ _default_window_size = (24, 80)
+
+ def __init__(self, cmd, stdin, stdout, stderr, call_args,
+ persist=False, pipe=STDOUT):
+
+ self.call_args = call_args
+
+ self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"]
+
+ # this logic is a little convoluted, but basically this top-level
+ # if/else is for consolidating input and output TTYs into a single
+ # TTY. this is the only way some secure programs like ssh will
+ # output correctly (is if stdout and stdin are both the same TTY)
+ if self._single_tty:
+ self._stdin_fd, self._slave_stdin_fd = pty.openpty()
+
+ self._stdout_fd = self._stdin_fd
+ self._slave_stdout_fd = self._slave_stdin_fd
+
+ self._stderr_fd = self._stdin_fd
+ self._slave_stderr_fd = self._slave_stdin_fd
+
+ # do not consolidate stdin and stdout
+ else:
+ if self.call_args["tty_in"]:
+ self._slave_stdin_fd, self._stdin_fd = pty.openpty()
+ else:
+ self._slave_stdin_fd, self._stdin_fd = os.pipe()
+
+ # tty_out is usually the default
+ if self.call_args["tty_out"]:
+ self._stdout_fd, self._slave_stdout_fd = pty.openpty()
+ else:
+ self._stdout_fd, self._slave_stdout_fd = os.pipe()
+
+ # unless STDERR is going to STDOUT, it ALWAYS needs to be a pipe,
+ # and never a PTY. the reason for this is not totally clear to me,
+ # but it has to do with the fact that if STDERR isn't set as the
+ # CTTY (because STDOUT is), the STDERR buffer won't always flush
+ # by the time the process exits, and the data will be lost.
+ # i've only seen this on OSX.
+ if stderr is not STDOUT:
+ self._stderr_fd, self._slave_stderr_fd = os.pipe()
+
+ gc_enabled = gc.isenabled()
+ if gc_enabled: gc.disable()
+ self.pid = os.fork()
+
+
+ # child
+ if self.pid == 0:
+ # this piece of ugliness is due to a bug where we can lose output
+ # if we do os.close(self._slave_stdout_fd) in the parent after
+ # the child starts writing.
+ # see http://bugs.python.org/issue15898
+ if IS_OSX and IS_PY3: _time.sleep(0.01)
+
+ os.setsid()
+
+ if self.call_args["tty_out"]:
+ # set raw mode, so there isn't any weird translation of newlines
+ # to \r\n and other oddities. we're not outputting to a terminal
+ # anyways
+ #
+ # we HAVE to do this here, and not in the parent thread, because
+ # we have to guarantee that this is set before the child process
+ # is run, and we can't do it twice.
+ tty.setraw(self._stdout_fd)
+
+
+ os.close(self._stdin_fd)
+ if not self._single_tty:
+ os.close(self._stdout_fd)
+ if stderr is not STDOUT: os.close(self._stderr_fd)
+
+
+ if self.call_args["cwd"]: os.chdir(self.call_args["cwd"])
+ os.dup2(self._slave_stdin_fd, 0)
+ os.dup2(self._slave_stdout_fd, 1)
+
+ # we're not directing stderr to stdout? then set self._slave_stderr_fd to
+ # fd 2, the common stderr fd
+ if stderr is STDOUT: os.dup2(self._slave_stdout_fd, 2)
+ else: os.dup2(self._slave_stderr_fd, 2)
+
+ # don't inherit file descriptors
+ max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
+ os.closerange(3, max_fd)
+
+
+ # set our controlling terminal
+ if self.call_args["tty_out"]:
+ tmp_fd = os.open(os.ttyname(1), os.O_RDWR)
+ os.close(tmp_fd)
+
+
+ if self.call_args["tty_out"]:
+ self.setwinsize(1)
+
+ # actually execute the process
+ if self.call_args["env"] is None: os.execv(cmd[0], cmd)
+ else: os.execve(cmd[0], cmd, self.call_args["env"])
+
+ os._exit(255)
+
+ # parent
+ else:
+ if gc_enabled: gc.enable()
+
+ if not OProc._registered_cleanup:
+ atexit.register(OProc._cleanup_procs)
+ OProc._registered_cleanup = True
+
+
+ self.started = _time.time()
+ self.cmd = cmd
+ self.exit_code = None
+
+ self.stdin = stdin or Queue()
+ self._pipe_queue = Queue()
+
+ # this is used to prevent a race condition when we're waiting for
+ # a process to end, and the OProc's internal threads are also checking
+ # for the processes's end
+ self._wait_lock = threading.Lock()
+
+ # these are for aggregating the stdout and stderr. we use a deque
+ # because we don't want to overflow
+ self._stdout = deque(maxlen=self.call_args["internal_bufsize"])
+ self._stderr = deque(maxlen=self.call_args["internal_bufsize"])
+
+ if self.call_args["tty_in"]: self.setwinsize(self._stdin_fd)
+
+
+ self.log = Logger("process", repr(self))
+
+ os.close(self._slave_stdin_fd)
+ if not self._single_tty:
+ os.close(self._slave_stdout_fd)
+ if stderr is not STDOUT: os.close(self._slave_stderr_fd)
+
+ self.log.debug("started process")
+ if not persist: OProc._procs_to_cleanup.add(self)
+
+
+ if self.call_args["tty_in"]:
+ attr = termios.tcgetattr(self._stdin_fd)
+ attr[3] &= ~termios.ECHO
+ termios.tcsetattr(self._stdin_fd, termios.TCSANOW, attr)
+
+ # this represents the connection from a Queue object (or whatever
+ # we're using to feed STDIN) to the process's STDIN fd
+ self._stdin_stream = StreamWriter("stdin", self, self._stdin_fd,
+ self.stdin, self.call_args["in_bufsize"])
+
+
+ stdout_pipe = None
+ if pipe is STDOUT and not self.call_args["no_pipe"]:
+ stdout_pipe = self._pipe_queue
+
+ # this represents the connection from a process's STDOUT fd to
+ # wherever it has to go, sometimes a pipe Queue (that we will use
+ # to pipe data to other processes), and also an internal deque
+ # that we use to aggregate all the output
+ save_stdout = not self.call_args["no_out"] and \
+ (self.call_args["tee"] in (True, "out") or stdout is None)
+ self._stdout_stream = StreamReader("stdout", self, self._stdout_fd, stdout,
+ self._stdout, self.call_args["out_bufsize"], stdout_pipe,
+ save_data=save_stdout)
+
+
+ if stderr is STDOUT or self._single_tty: self._stderr_stream = None
+ else:
+ stderr_pipe = None
+ if pipe is STDERR and not self.call_args["no_pipe"]:
+ stderr_pipe = self._pipe_queue
+
+ save_stderr = not self.call_args["no_err"] and \
+ (self.call_args["tee"] in ("err",) or stderr is None)
+ self._stderr_stream = StreamReader("stderr", self, self._stderr_fd, stderr,
+ self._stderr, self.call_args["err_bufsize"], stderr_pipe,
+ save_data=save_stderr)
+
+ # start the main io threads
+ self._input_thread = self._start_thread(self.input_thread, self._stdin_stream)
+ self._output_thread = self._start_thread(self.output_thread, self._stdout_stream, self._stderr_stream)
+
+
+ def __repr__(self):
+ return "<Process %d %r>" % (self.pid, self.cmd[:500])
+
+
+ # also borrowed from pexpect.py
+ @staticmethod
+ def setwinsize(fd):
+ rows, cols = OProc._default_window_size
+ TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
+ if TIOCSWINSZ == 2148037735: # L is not required in Python >= 2.2.
+ TIOCSWINSZ = -2146929561 # Same bits, but with sign.
+
+ s = struct.pack('HHHH', rows, cols, 0, 0)
+ fcntl.ioctl(fd, TIOCSWINSZ, s)
+
+
+ @staticmethod
+ def _start_thread(fn, *args):
+ thrd = threading.Thread(target=fn, args=args)
+ thrd.daemon = True
+ thrd.start()
+ return thrd
+
+ def in_bufsize(self, buf):
+ self._stdin_stream.stream_bufferer.change_buffering(buf)
+
+ def out_bufsize(self, buf):
+ self._stdout_stream.stream_bufferer.change_buffering(buf)
+
+ def err_bufsize(self, buf):
+ if self._stderr_stream:
+ self._stderr_stream.stream_bufferer.change_buffering(buf)
+
+
+ def input_thread(self, stdin):
+ done = False
+ while not done and self.alive:
+ self.log.debug("%r ready for more input", stdin)
+ done = stdin.write()
+
+ stdin.close()
+
+
+ def output_thread(self, stdout, stderr):
+ readers = []
+ errors = []
+
+ if stdout is not None:
+ readers.append(stdout)
+ errors.append(stdout)
+ if stderr is not None:
+ readers.append(stderr)
+ errors.append(stderr)
+
+ while readers:
+ outputs, inputs, err = select.select(readers, [], errors, 0.1)
+
+ # stdout and stderr
+ for stream in outputs:
+ self.log.debug("%r ready to be read from", stream)
+ done = stream.read()
+ if done: readers.remove(stream)
+
+ for stream in err:
+ pass
+
+ # test if the process has been running too long
+ if self.call_args["timeout"]:
+ now = _time.time()
+ if now - self.started > self.call_args["timeout"]:
+ self.log.debug("we've been running too long")
+ self.kill()
+
+
+ # this is here because stdout may be the controlling TTY, and
+ # we can't close it until the process has ended, otherwise the
+ # child will get SIGHUP. typically, if we've broken out of
+ # the above loop, and we're here, the process is just about to
+ # end, so it's probably ok to aggressively poll self.alive
+ #
+ # the other option to this would be to do the CTTY close from
+ # the method that does the actual os.waitpid() call, but the
+ # problem with that is that the above loop might still be
+ # running, and closing the fd will cause some operation to
+ # fail. this is less complex than wrapping all the ops
+ # in the above loop with out-of-band fd-close exceptions
+ while self.alive: _time.sleep(0.001)
+ if stdout: stdout.close()
+ if stderr: stderr.close()
+
+
+ @property
+ def stdout(self):
+ return "".encode(self.call_args["encoding"]).join(self._stdout)
+
+ @property
+ def stderr(self):
+ return "".encode(self.call_args["encoding"]).join(self._stderr)
+
+
+ def signal(self, sig):
+ self.log.debug("sending signal %d", sig)
+ try: os.kill(self.pid, sig)
+ except OSError: pass
+
+ def kill(self):
+ self.log.debug("killing")
+ self.signal(signal.SIGKILL)
+
+ def terminate(self):
+ self.log.debug("terminating")
+ self.signal(signal.SIGTERM)
+
+ @staticmethod
+ def _cleanup_procs():
+ for proc in OProc._procs_to_cleanup:
+ proc.kill()
+
+
+ def _handle_exit_code(self, exit_code):
+ # if we exited from a signal, let our exit code reflect that
+ if os.WIFSIGNALED(exit_code): return -os.WTERMSIG(exit_code)
+ # otherwise just give us a normal exit code
+ elif os.WIFEXITED(exit_code): return os.WEXITSTATUS(exit_code)
+ else: raise RuntimeError("Unknown child exit status!")
+
+ @property
+ def alive(self):
+ if self.exit_code is not None: return False
+
+ # what we're doing here essentially is making sure that the main thread
+ # (or another thread), isn't calling .wait() on the process. because
+ # .wait() calls os.waitpid(self.pid, 0), we can't do an os.waitpid
+ # here...because if we did, and the process exited while in this
+ # thread, the main thread's os.waitpid(self.pid, 0) would raise OSError
+ # (because the process ended in another thread).
+ #
+ # so essentially what we're doing is, using this lock, checking if
+ # we're calling .wait(), and if we are, let .wait() get the exit code
+ # and handle the status, otherwise let us do it.
+ acquired = self._wait_lock.acquire(False)
+ if not acquired:
+ if self.exit_code is not None: return False
+ return True
+
+ try:
+ # WNOHANG is just that...we're calling waitpid without hanging...
+ # essentially polling the process
+ pid, exit_code = os.waitpid(self.pid, os.WNOHANG)
+ if pid == self.pid:
+ self.exit_code = self._handle_exit_code(exit_code)
+ return False
+
+ # no child process
+ except OSError: return False
+ else: return True
+ finally: self._wait_lock.release()
+
+
+ def wait(self):
+ self.log.debug("acquiring wait lock to wait for completion")
+ with self._wait_lock:
+ self.log.debug("got wait lock")
+
+ if self.exit_code is None:
+ self.log.debug("exit code not set, waiting on pid")
+ pid, exit_code = os.waitpid(self.pid, 0)
+ self.exit_code = self._handle_exit_code(exit_code)
+ else:
+ self.log.debug("exit code already set (%d), no need to wait", self.exit_code)
+
+ self._input_thread.join()
+ self._output_thread.join()
+
+ OProc._procs_to_cleanup.discard(self)
+
+ return self.exit_code
+
+
+
+
+class DoneReadingStdin(Exception): pass
+class NoStdinData(Exception): pass
+
+
+
+# this guy is for reading from some input (the stream) and writing to our
+# opened process's stdin fd. the stream can be a Queue, a callable, something
+# with the "read" method, a string, or an iterable
+class StreamWriter(object):
+ def __init__(self, name, process, stream, stdin, bufsize):
+ self.name = name
+ self.process = weakref.ref(process)
+ self.stream = stream
+ self.stdin = stdin
+
+ self.log = Logger("streamwriter", repr(self))
+
+
+ self.stream_bufferer = StreamBufferer(self.process().call_args["encoding"],
+ bufsize)
+
+ # determine buffering for reading from the input we set for stdin
+ if bufsize == 1: self.bufsize = 1024
+ elif bufsize == 0: self.bufsize = 1
+ else: self.bufsize = bufsize
+
+
+ if isinstance(stdin, Queue):
+ log_msg = "queue"
+ self.get_chunk = self.get_queue_chunk
+
+ elif callable(stdin):
+ log_msg = "callable"
+ self.get_chunk = self.get_callable_chunk
+
+ # also handles stringio
+ elif hasattr(stdin, "read"):
+ log_msg = "file descriptor"
+ self.get_chunk = self.get_file_chunk
+
+ elif isinstance(stdin, basestring):
+ log_msg = "string"
+
+ if bufsize == 1:
+ # TODO, make the split() be a generator
+ self.stdin = iter((c+"\n" for c in stdin.split("\n")))
+ else:
+ self.stdin = iter(stdin[i:i+self.bufsize] for i in range(0, len(stdin), self.bufsize))
+ self.get_chunk = self.get_iter_chunk
+
+ else:
+ log_msg = "general iterable"
+ self.stdin = iter(stdin)
+ self.get_chunk = self.get_iter_chunk
+
+ self.log.debug("parsed stdin as a %s", log_msg)
+
+
+ def __repr__(self):
+ return "<StreamWriter %s for %r>" % (self.name, self.process())
+
+ def fileno(self):
+ return self.stream
+
+ def get_queue_chunk(self):
+ try: chunk = self.stdin.get(True, 0.01)
+ except Empty: raise NoStdinData
+ if chunk is None: raise DoneReadingStdin
+ return chunk
+
+ def get_callable_chunk(self):
+ try: return self.stdin()
+ except: raise DoneReadingStdin
+
+ def get_iter_chunk(self):
+ try:
+ if IS_PY3: return self.stdin.__next__()
+ else: return self.stdin.next()
+ except StopIteration: raise DoneReadingStdin
+
+ def get_file_chunk(self):
+ if self.stream_bufferer.type == 1: chunk = self.stdin.readline()
+ else: chunk = self.stdin.read(self.bufsize)
+ if not chunk: raise DoneReadingStdin
+ else: return chunk
+
+
+ # the return value answers the questions "are we done writing forever?"
+ def write(self):
+ # get_chunk may sometimes return bytes, and sometimes returns trings
+ # because of the nature of the different types of STDIN objects we
+ # support
+ try: chunk = self.get_chunk()
+ except DoneReadingStdin:
+ self.log.debug("done reading")
+
+ if self.process().call_args["tty_in"]:
+ # EOF time
+ try: char = termios.tcgetattr(self.stream)[6][termios.VEOF]
+ except: char = chr(4).encode()
+ os.write(self.stream, char)
+
+ return True
+
+ except NoStdinData:
+ self.log.debug("received no data")
+ return False
+
+ # if we're not bytes, make us bytes
+ if IS_PY3 and hasattr(chunk, "encode"):
+ chunk = chunk.encode(self.process().call_args["encoding"])
+
+ for chunk in self.stream_bufferer.process(chunk):
+ self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
+
+ self.log.debug("writing chunk to process")
+ try:
+ os.write(self.stream, chunk)
+ except OSError:
+ self.log.debug("OSError writing stdin chunk")
+ return True
+
+
+ def close(self):
+ self.log.debug("closing, but flushing first")
+ chunk = self.stream_bufferer.flush()
+ self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
+ try:
+ if chunk: os.write(self.stream, chunk)
+ if not self.process().call_args["tty_in"]:
+ self.log.debug("we used a TTY, so closing the stream")
+ os.close(self.stream)
+ except OSError: pass
+
+
+
+class StreamReader(object):
+ def __init__(self, name, process, stream, handler, buffer, bufsize,
+ pipe_queue=None, save_data=True):
+ self.name = name
+ self.process = weakref.ref(process)
+ self.stream = stream
+ self.buffer = buffer
+ self.save_data = save_data
+ self.encoding = process.call_args["encoding"]
+ self.decode_errors = process.call_args["decode_errors"]
+
+ self.pipe_queue = None
+ if pipe_queue: self.pipe_queue = weakref.ref(pipe_queue)
+
+ self.log = Logger("streamreader", repr(self))
+
+ self.stream_bufferer = StreamBufferer(self.encoding, bufsize,
+ self.decode_errors)
+
+ # determine buffering
+ if bufsize == 1: self.bufsize = 1024
+ elif bufsize == 0: self.bufsize = 1
+ else: self.bufsize = bufsize
+
+
+ # here we're determining the handler type by doing some basic checks
+ # on the handler object
+ self.handler = handler
+ if callable(handler): self.handler_type = "fn"
+ elif isinstance(handler, StringIO): self.handler_type = "stringio"
+ elif isinstance(handler, cStringIO):
+ self.handler_type = "cstringio"
+ elif hasattr(handler, "write"): self.handler_type = "fd"
+ else: self.handler_type = None
+
+
+ self.should_quit = False
+
+ # here we choose how to call the callback, depending on how many
+ # arguments it takes. the reason for this is to make it as easy as
+ # possible for people to use, without limiting them. a new user will
+ # assume the callback takes 1 argument (the data). as they get more
+ # advanced, they may want to terminate the process, or pass some stdin
+ # back, and will realize that they can pass a callback of more args
+ if self.handler_type == "fn":
+ implied_arg = 0
+ if inspect.ismethod(handler):
+ implied_arg = 1
+ num_args = len(inspect.getargspec(handler).args)
+
+ else:
+ if inspect.isfunction(handler):
+ num_args = len(inspect.getargspec(handler).args)
+
+ # is an object instance with __call__ method
+ else:
+ implied_arg = 1
+ num_args = len(inspect.getargspec(handler.__call__).args)
+
+
+ self.handler_args = ()
+ if num_args == implied_arg + 2:
+ self.handler_args = (self.process().stdin,)
+ elif num_args == implied_arg + 3:
+ self.handler_args = (self.process().stdin, self.process)
+
+
+ def fileno(self):
+ return self.stream
+
+ def __repr__(self):
+ return "<StreamReader %s for %r>" % (self.name, self.process())
+
+ def close(self):
+ chunk = self.stream_bufferer.flush()
+ self.log.debug("got chunk size %d to flush: %r",
+ len(chunk), chunk[:30])
+ if chunk: self.write_chunk(chunk)
+
+ if self.handler_type == "fd" and hasattr(self.handler, "close"):
+ self.handler.flush()
+
+ if self.pipe_queue and self.save_data: self.pipe_queue().put(None)
+ try: os.close(self.stream)
+ except OSError: pass
+
+
+ def write_chunk(self, chunk):
+ # in PY3, the chunk coming in will be bytes, so keep that in mind
+
+ if self.handler_type == "fn" and not self.should_quit:
+ # try to use the encoding first, if that doesn't work, send
+ # the bytes, because it might be binary
+ try: to_handler = chunk.decode(self.encoding, self.decode_errors)
+ except UnicodeDecodeError: to_handler = chunk
+
+ # this is really ugly, but we can't store self.process as one of
+ # the handler args in self.handler_args, the reason being is that
+ # it would create cyclic references, and prevent objects from
+ # being garbage collected. so we're determining if this handler
+ # even requires self.process (by the argument count), and if it
+ # does, resolving the weakref to a hard reference and passing
+ # that into the handler
+ handler_args = self.handler_args
+ if len(self.handler_args) == 2:
+ handler_args = (self.handler_args[0], self.process())
+ self.should_quit = self.handler(to_handler, *handler_args)
+
+ elif self.handler_type == "stringio":
+ self.handler.write(chunk.decode(self.encoding, self.decode_errors))
+
+ elif self.handler_type in ("cstringio", "fd"):
+ self.handler.write(chunk)
+
+
+ if self.save_data:
+ self.buffer.append(chunk)
+
+ if self.pipe_queue:
+ self.log.debug("putting chunk onto pipe: %r", chunk[:30])
+ self.pipe_queue().put(chunk)
+
+
+ def read(self):
+ # if we're PY3, we're reading bytes, otherwise we're reading
+ # str
+ try: chunk = os.read(self.stream, self.bufsize)
+ except OSError as e:
+ self.log.debug("got errno %d, done reading", e.errno)
+ return True
+ if not chunk:
+ self.log.debug("got no chunk, done reading")
+ return True
+
+ self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
+ for chunk in self.stream_bufferer.process(chunk):
+ self.write_chunk(chunk)
+
+
+
+
+# this is used for feeding in chunks of stdout/stderr, and breaking it up into
+# chunks that will actually be put into the internal buffers. for example, if
+# you have two processes, one being piped to the other, and you want that,
+# first process to feed lines of data (instead of the chunks however they
+# come in), OProc will use an instance of this class to chop up the data and
+# feed it as lines to be sent down the pipe
+class StreamBufferer(object):
+ def __init__(self, encoding=DEFAULT_ENCODING, buffer_type=1,
+ decode_errors="strict"):
+ # 0 for unbuffered, 1 for line, everything else for that amount
+ self.type = buffer_type
+ self.buffer = []
+ self.n_buffer_count = 0
+ self.encoding = encoding
+ self.decode_errors = decode_errors
+
+ # this is for if we change buffering types. if we change from line
+ # buffered to unbuffered, its very possible that our self.buffer list
+ # has data that was being saved up (while we searched for a newline).
+ # we need to use that up, so we don't lose it
+ self._use_up_buffer_first = False
+
+ # the buffering lock is used because we might chance the buffering
+ # types from a different thread. for example, if we have a stdout
+ # callback, we might use it to change the way stdin buffers. so we
+ # lock
+ self._buffering_lock = threading.RLock()
+ self.log = Logger("stream_bufferer")
+
+
+ def change_buffering(self, new_type):
+ # TODO, when we stop supporting 2.6, make this a with context
+ self.log.debug("acquiring buffering lock for changing buffering")
+ self._buffering_lock.acquire()
+ self.log.debug("got buffering lock for changing buffering")
+ try:
+ if new_type == 0: self._use_up_buffer_first = True
+
+ self.type = new_type
+ finally:
+ self._buffering_lock.release()
+ self.log.debug("released buffering lock for changing buffering")
+
+
+ def process(self, chunk):
+ # MAKE SURE THAT THE INPUT IS PY3 BYTES
+ # THE OUTPUT IS ALWAYS PY3 BYTES
+
+ # TODO, when we stop supporting 2.6, make this a with context
+ self.log.debug("acquiring buffering lock to process chunk (buffering: %d)", self.type)
+ self._buffering_lock.acquire()
+ self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type)
+ try:
+ # we've encountered binary, permanently switch to N size buffering
+ # since matching on newline doesn't make sense anymore
+ if self.type == 1:
+ try: chunk.decode(self.encoding, self.decode_errors)
+ except:
+ self.log.debug("detected binary data, changing buffering")
+ self.change_buffering(1024)
+
+ # unbuffered
+ if self.type == 0:
+ if self._use_up_buffer_first:
+ self._use_up_buffer_first = False
+ to_write = self.buffer
+ self.buffer = []
+ to_write.append(chunk)
+ return to_write
+
+ return [chunk]
+
+ # line buffered
+ elif self.type == 1:
+ total_to_write = []
+ chunk = chunk.decode(self.encoding, self.decode_errors)
+ while True:
+ newline = chunk.find("\n")
+ if newline == -1: break
+
+ chunk_to_write = chunk[:newline+1]
+ if self.buffer:
+ # this is ugly, but it's designed to take the existing
+ # bytes buffer, join it together, tack on our latest
+ # chunk, then convert the whole thing to a string.
+ # it's necessary, i'm sure. read the whole block to
+ # see why.
+ chunk_to_write = "".encode(self.encoding).join(self.buffer) \
+ + chunk_to_write.encode(self.encoding)
+ chunk_to_write = chunk_to_write.decode(self.encoding)
+
+ self.buffer = []
+ self.n_buffer_count = 0
+
+ chunk = chunk[newline+1:]
+ total_to_write.append(chunk_to_write.encode(self.encoding))
+
+ if chunk:
+ self.buffer.append(chunk.encode(self.encoding))
+ self.n_buffer_count += len(chunk)
+ return total_to_write
+
+ # N size buffered
+ else:
+ total_to_write = []
+ while True:
+ overage = self.n_buffer_count + len(chunk) - self.type
+ if overage >= 0:
+ ret = "".encode(self.encoding).join(self.buffer) + chunk
+ chunk_to_write = ret[:self.type]
+ chunk = ret[self.type:]
+ total_to_write.append(chunk_to_write)
+ self.buffer = []
+ self.n_buffer_count = 0
+ else:
+ self.buffer.append(chunk)
+ self.n_buffer_count += len(chunk)
+ break
+ return total_to_write
+ finally:
+ self._buffering_lock.release()
+ self.log.debug("released buffering lock for processing chunk (buffering: %d)", self.type)
+
+
+ def flush(self):
+ self.log.debug("acquiring buffering lock for flushing buffer")
+ self._buffering_lock.acquire()
+ self.log.debug("got buffering lock for flushing buffer")
+ try:
+ ret = "".encode(self.encoding).join(self.buffer)
+ self.buffer = []
+ return ret
+ finally:
+ self._buffering_lock.release()
+ self.log.debug("released buffering lock for flushing buffer")
+
+
+
+
+
+# this allows lookups to names that aren't found in the global scope to be
+# searched for as a program name. for example, if "ls" isn't found in this
+# module's scope, we consider it a system program and try to find it.
+#
+# we use a dict instead of just a regular object as the base class because
+# the exec() statement used in this file requires the "globals" argument to
+# be a dictionary
+class Environment(dict):
+ def __init__(self, globs, baked_args={}):
+ self.globs = globs
+ self.baked_args = baked_args
+
+ def __setitem__(self, k, v):
+ self.globs[k] = v
+
+ def __getitem__(self, k):
+ try: return self.globs[k]
+ except KeyError: pass
+
+ # the only way we'd get to here is if we've tried to
+ # import * from a repl. so, raise an exception, since
+ # that's really the only sensible thing to do
+ if k == "__all__":
+ raise ImportError("Cannot import * from sh. \
+Please import sh or import programs individually.")
+
+ # if we end with "_" just go ahead and skip searching
+ # our namespace for python stuff. this was mainly for the
+ # command "id", which is a popular program for finding
+ # if a user exists, but also a python function for getting
+ # the address of an object. so can call the python
+ # version by "id" and the program version with "id_"
+ if not k.endswith("_"):
+ # check if we're naming a dynamically generated ReturnCode exception
+ try: return rc_exc_cache[k]
+ except KeyError:
+ m = rc_exc_regex.match(k)
+ if m:
+ exit_code = int(m.group(2))
+ if m.group(1) == "SignalException": exit_code = -exit_code
+ return get_rc_exc(exit_code)
+
+ # is it a builtin?
+ try: return getattr(self["__builtins__"], k)
+ except AttributeError: pass
+ elif not k.startswith("_"): k = k.rstrip("_")
+
+
+ # https://github.com/ipython/ipython/issues/2577
+ # https://github.com/amoffat/sh/issues/97#issuecomment-10610629
+ if k.startswith("__") and k.endswith("__"):
+ raise AttributeError
+
+ # how about an environment variable?
+ try: return os.environ[k]
+ except KeyError: pass
+
+ # is it a custom builtin?
+ builtin = getattr(self, "b_"+k, None)
+ if builtin: return builtin
+
+ # it must be a command then
+ # we use _create instead of instantiating the class directly because
+ # _create uses resolve_program, which will automatically do underscore-
+ # to-dash conversions. instantiating directly does not use that
+ return Command._create(k, **self.baked_args)
+
+
+ # methods that begin with "b_" are custom builtins and will override any
+ # program that exists in our path. this is useful for things like
+ # common shell builtins that people are used to, but which aren't actually
+ # full-fledged system binaries
+
+ def b_cd(self, path):
+ os.chdir(path)
+
+ def b_which(self, program):
+ return which(program)
+
+
+
+
+def run_repl(env):
+ banner = "\n>> sh v{version}\n>> https://github.com/amoffat/sh\n"
+
+ print(banner.format(version=__version__))
+ while True:
+ try: line = raw_input("sh> ")
+ except (ValueError, EOFError): break
+
+ try: exec(compile(line, "<dummy>", "single"), env, env)
+ except SystemExit: break
+ except: print(traceback.format_exc())
+
+ # cleans up our last line
+ print("")
+
+
+
+
+# this is a thin wrapper around THIS module (we patch sys.modules[__name__]).
+# this is in the case that the user does a "from sh import whatever"
+# in other words, they only want to import certain programs, not the whole
+# system PATH worth of commands. in this case, we just proxy the
+# import lookup to our Environment class
+class SelfWrapper(ModuleType):
+ def __init__(self, self_module, baked_args={}):
+ # this is super ugly to have to copy attributes like this,
+ # but it seems to be the only way to make reload() behave
+ # nicely. if i make these attributes dynamic lookups in
+ # __getattr__, reload sometimes chokes in weird ways...
+ for attr in ["__builtins__", "__doc__", "__name__", "__package__"]:
+ setattr(self, attr, getattr(self_module, attr, None))
+
+ # python 3.2 (2.7 and 3.3 work fine) breaks on osx (not ubuntu)
+ # if we set this to None. and 3.3 needs a value for __path__
+ self.__path__ = []
+ self.self_module = self_module
+ self.env = Environment(globals(), baked_args)
+
+ def __setattr__(self, name, value):
+ if hasattr(self, "env"): self.env[name] = value
+ ModuleType.__setattr__(self, name, value)
+
+ def __getattr__(self, name):
+ if name == "env": raise AttributeError
+ return self.env[name]
+
+ # accept special keywords argument to define defaults for all operations
+ # that will be processed with given by return SelfWrapper
+ def __call__(self, **kwargs):
+ return SelfWrapper(self.self_module, kwargs)
+
+
+
+
+# we're being run as a stand-alone script
+if __name__ == "__main__":
+ try: arg = sys.argv.pop(1)
+ except: arg = None
+
+ if arg == "test":
+ import subprocess
+
+ def run_test(version):
+ py_version = "python%s" % version
+ py_bin = which(py_version)
+
+ if py_bin:
+ print("Testing %s" % py_version.capitalize())
+
+ p = subprocess.Popen([py_bin, os.path.join(THIS_DIR, "test.py")]
+ + sys.argv[1:])
+ p.wait()
+ else:
+ print("Couldn't find %s, skipping" % py_version.capitalize())
+
+ versions = ("2.6", "2.7", "3.1", "3.2", "3.3")
+ for version in versions: run_test(version)
+
+ else:
+ env = Environment(globals())
+ run_repl(env)
+
+# we're being imported from somewhere
+else:
+ self = sys.modules[__name__]
+ sys.modules[__name__] = SelfWrapper(self)