Source code for precept._cli

"""Class based cli builder with sub commands."""
import shlex
import sys
import argparse
import asyncio
import itertools
import typing
import functools
import inspect

import stringcase

from ._tools import is_windows
from ._services import Service
from ._immutable import ImmutableDict
from .events import PreceptEvent


def _flags_key(flags):
    return stringcase.snakecase(flags[-1].lstrip('-'))


[docs]class Argument(ImmutableDict): """ Argument of a Command, can either be optional or not depending on the flags .. seealso:: https://docs.python.org/3/library/argparse.html#the-add-argument-method """ # pylint: disable=unused-argument, redefined-builtin
[docs] def __init__( self, *flags: str, type: type = None, help: str = None, choices: typing.Iterable = None, default: typing.Any = None, nargs: typing.Union[str, int] = None, action: str = None, required: bool = None, metavar: str = None, dest: str = None, ): """ :param flags: How to call the argument, prefixing with ``-`` makes the argument a keyword. Example: ``'-d', '--date'`` make the variable available as ``date``, but can be supplied as ``-d`` from the cli. :param type: The type of the variable to cast to, default to str. :param help: Description to go along with :param choices: The available choices to choose from. :param default: The value to take if not supplied. :param nargs: Number of times the argument can be supplied. :param action: What to do with the argument. :param required: Makes a keyword argument required. :param metavar: Name in help. :param dest: The name of the variable to add the value to once parsed. """ super().__init__(**{ k: v for k, v in locals().items() if k in self._prop_keys and v is not None })
[docs] def register(self, parser): options = {k: v for k, v in self.items() if k != 'flags'} if 'default' in options and 'help' not in options: # Otherwise the default value don't show up. options['help'] = '-' flags = self.flags if len(self.flags) == 1 and not self.flags[0].startswith('-'): flags = [stringcase.snakecase(flags[0])] parser.add_argument(*flags, **options)
@property def flag_key(self): return _flags_key(self.flags)
class CombinedFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter): pass
[docs]class Command: """ Command decorator, methods of `CliApp` subclasses decorated with this gets a sub-command in the parser. Wrapped methods will gets the arguments by the ``Argument`` flag. """ arguments: typing.Iterable[Argument] description: str # pylint: disable=redefined-builtin
[docs] def __init__( self, *arguments: Argument, name: str = None, description: str = None, help: str = None, auto: bool = False, services: typing.List[Service] = None ): self.arguments = arguments self._command_name = None self.command_name = name self.description = description self.help = help self._wrapped = None self.obj_name = None self.auto = auto self.services = services
def __call__(self, obj): self.obj_name = getattr(obj, '__name__') if not self.command_name: self.command_name = self.obj_name if not self.description: self.description = getattr(obj, '__doc__', '') if isinstance(obj, type): # pylint: disable=used-before-assignment class NewCommand(obj, CommandClass, metaclass=CommandMeta): command = self self._wrapped = NewCommand() else: if self.auto: arguments = [] signature = inspect.signature(obj) for k, v in signature.parameters.items(): if k in ('self', 'args', 'kwargs'): continue key = stringcase.spinalcase(k) default = None _type = None if v.annotation: _type = v.annotation if v.default is not v.empty: key = f'--{key}' default = v.default if _type is None: # pragma: no cover _type = type(default) arguments.append( Argument(key, type=_type, default=default) ) self.arguments = tuple(arguments) + tuple(self.arguments) self._wrapped = CommandFunction(obj, self) return self._wrapped
[docs] def register(self, subparsers): parser = subparsers.add_parser( self.command_name, description=self.description, help=self.help or self.description, formatter_class=CombinedFormatter ) if isinstance(self._wrapped, CommandClass): subs = parser.add_subparsers( title='Commands', dest='command' ) # noinspection PyProtectedMember for command_name, command, _ in self._wrapped.get_commands(): # get_commands return itself so make sure it's not the same. if command_name != self.command_name: command.register(subs) for arg in self.arguments: arg.register(parser)
@property def command_name(self): return self._command_name @command_name.setter def command_name(self, value): self._command_name = stringcase.spinalcase(value) if value else value def __hash__(self): # pragma: no cover return self.command_name
class WrappedCommand: command: Command def __repr__(self): # pragma: no cover return f'<{self.__class__.__name__} "{self.command.command_name}">' def get_commands(self) -> typing.Tuple[str, Command, typing.Callable]: # pragma: no cover # noqa: E501 raise NotImplementedError class CommandFunction(WrappedCommand): def __init__(self, func, command): self.command = command self.func = functools.wraps(self)(func) def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) def __get__(self, instance, owner): # Allow to use the self of the class if instance is None: return self return functools.partial(self.__call__, instance) def get_commands(self): return [(self.command.command_name, self.command, self)] class CommandClass(WrappedCommand): _commands: typing.List[str] def __call__(self, command, *args, **kwargs): # pragma: no cover return getattr(self, command)(*args, **kwargs) def get_commands(self): c = [] for command in (getattr(self.__class__, x) for x in self._commands): if issubclass(command.__class__, CommandClass): # pragma: no cover # Recursively get all the commands. c += command.get_commands() else: # Get the true one. c.append(( command.command.command_name, command.command, self.clean_arguments( getattr(self, command.command.obj_name)) )) # noinspection PyTypeChecker return c + [(self.command.command_name, self.command, self)] def clean_arguments(self, func): @functools.wraps(func) def wrap_arguments(*args, **kwargs): to_clean = [] for argument in self.command.arguments: key = argument.flag_key value = kwargs.get(key) setattr(self, key, value) to_clean.append(key) cleaned = {k: v for k, v in kwargs.items() if k not in to_clean} return func(*args, **cleaned) return wrap_arguments class CommandMeta(type): def __new__(mcs, name, bases, attributes): new_attributes = dict(**attributes) commands = [] for k, v in itertools.chain(*( z.items() for z in [y.__dict__ for y in bases] + [attributes] )): if isinstance(v, WrappedCommand): commands.append(k) if isinstance(v, CommandClass): # The ast get to nested's first so this can work. commands += v._commands new_attributes['_commands'] = commands return type.__new__(mcs, name, bases, new_attributes) class Cli: """argparse cli wrapper.""" def __init__(self, *commands, prog='', description='', formatter_class=CombinedFormatter, global_arguments=None, default_command=None, events=None, on_parse=None): self.prog = prog self.default_command = default_command self.parser = argparse.ArgumentParser( prog=self.prog, description=description, formatter_class=formatter_class, ) self._global_arguments = global_arguments or [] self.globals = {} self._on_parse = on_parse self._events = events self.raw_args = [] for g in self._global_arguments: g.register(self.parser) self.commands = {} subparsers = self.parser.add_subparsers( title='Commands', dest='command', metavar='' ) for command_name, command, wrapper in commands: command.register(subparsers) self.commands[command_name] = wrapper async def run(self, args: typing.Union[str, list] = None): """ Parse and call the appropriate handler. :param args: None takes sys.argv :return: """ if isinstance(args, str): args = shlex.split(args, posix=not is_windows()) self.raw_args = args or sys.argv namespace = self.parser.parse_args(args=args) command = self.commands.get(namespace.command) kw = vars(namespace).copy() kw.pop('command') for g in self._global_arguments: key = _flags_key(g.flags) self.globals[key] = kw.pop(key) if callable(self._on_parse): await self._on_parse(namespace) if command: operation = command(**kw) event = self._events.dispatch( str(PreceptEvent.CLI_STARTED), command=namespace.command ) await asyncio.gather(event, operation) elif self.default_command: await self.default_command(**vars(namespace)) else: # pragma: no cover self.parser.print_help()