Source code for precept.console._keyhandler

import asyncio
import string
import sys
import threading
from itertools import chain

from precept._tools import is_windows


class Key:  # pragma: no cover
    def __init__(self, value, clean=None):
        self.value = value
        self.clean = clean

    def __str__(self):
        return self.clean or self.value

    def __eq__(self, other):
        if isinstance(other, Key):
            return other.value == self.value
        if isinstance(other, str):
            return other == self.value
        return False

    def __hash__(self):
        return hash(self.value)

    def __repr__(self):
        return f"<Key '{self.clean or self.value}'>"


[docs]class Keys: # pragma: no cover SPACE = Key(' ', 'space') BACKSPACE = Key('\x7f', 'backspace') ENTER = Key('\r', 'enter') ESCAPE = Key('\x1b', 'escape') INSERT = Key('\x1b[2~', 'insert') END = Key('\x1b[F', 'end') HOME = Key('\x1b[H', 'home') DELETE = Key('\x1b[3~', 'delete') DOWN = Key('\x1b[B', 'down') UP = Key('\x1b[A', 'up') LEFT = Key('\x1b[D', 'left') RIGHT = Key('\x1b[C', 'right') F1 = Key('\x1bOP', 'F1') F2 = Key('\x1bOQ', 'F2') F3 = Key('\x1bOR', 'F3') F4 = Key('\x1bOS', 'F4') F5 = Key('\x1bO15~', 'F5') F6 = Key('\x1bO17~', 'F6') F7 = Key('\x1bO18~', 'F7') F8 = Key('\x1bO19~', 'F8') F9 = Key('\x1bO20~', 'F9') F10 = Key('\x1bO21~', 'F10') F11 = Key('\x1bO23~', 'F11') F12 = Key('\x1bO24~', 'F12') CTRL_C = Key('\x03', 'ctrl-c') CTRL_A = Key('\x01', 'ctrl-a') CTRL_ALT_A = Key('\x1b\x01', 'ctrl-alt-a') CTRL_ALT_DEL = Key('\x1b[3^', 'ctrl-alt-del') CTRL_B = Key('\x02', 'ctrl-b') CTRL_D = Key('\x04', 'ctrl-d') CTRL_E = Key('\x05', 'ctrl-e') CTRL_F = Key('\x06', 'ctrl-f') CTRL_Z = Key('\x1a', 'ctrl-z') SPECIAL_KEYS = ( SPACE, BACKSPACE, ENTER, ESCAPE, INSERT, END, HOME, DELETE, DOWN, UP, LEFT, RIGHT, F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11, F12, CTRL_C, CTRL_A, CTRL_ALT_A, CTRL_ALT_DEL, CTRL_B, CTRL_D, CTRL_E, CTRL_F, CTRL_Z ) keys = { x: Key(x) for x in chain(string.ascii_letters, string.digits) } keys.update({ x.value: x for x in SPECIAL_KEYS })
[docs] @classmethod def get_key(cls, value, default=None): return cls.keys.get(value) or default
class GetChar: def __init__(self): # pragma: no cover if is_windows(): # pylint: disable=import-error import msvcrt self.get_char = msvcrt.getwch else: import termios import tty def get_char(): # pylint: disable=assignment-from-no-return fileno = sys.stdin.fileno() old = termios.tcgetattr(fileno) try: tty.setraw(fileno) raw = termios.tcgetattr(fileno) raw[1] = old[1] termios.tcsetattr(fileno, termios.TCSADRAIN, raw) char = sys.stdin.read(1) finally: termios.tcsetattr(fileno, termios.TCSADRAIN, old) return char self.get_char = get_char def __call__(self): # pragma: no cover return self.get_char() async def deferred(self, executor): # pragma: no cover return await executor.execute(self.get_char) getch = GetChar()
[docs]class KeyHandler: # pragma: no cover
[docs] def __init__(self, handlers, loop=None, default_handler=None): # pragma: no cover # noqa: E501 self.handlers = handlers self.handlers.update({ Keys.CTRL_C: lambda _, stop: stop(), }) self.default_handler = default_handler self.loop = loop or asyncio.get_event_loop() self.queue = asyncio.Queue(loop=self.loop) self.stop_event = asyncio.Event(loop=self.loop) self._consumer = None self._producer = None
[docs] def stop(self): # pragma: no cover self.stop_event.set()
[docs] def read(self): # pragma: no cover # Make non-blocking. while not self.stop_event.is_set(): char = getch() asyncio.ensure_future(self.queue.put(char), loop=self.loop)
[docs] async def handle(self): # pragma: no cover while not self.stop_event.is_set(): await asyncio.sleep(0.00001) try: msg = self.queue.get_nowait() except asyncio.QueueEmpty: pass else: handler = self.handlers.get(msg) if handler: handler(msg, self.stop) elif self.default_handler: self.default_handler(msg, self.stop)
async def __aenter__(self): # pragma: no cover self._producer = threading.Thread(target=self.read) self._producer.daemon = True self._producer.start() self._consumer = asyncio.ensure_future(self.handle(), loop=self.loop) return self async def __aexit__(self, exc_type, exc_val, exc_tb): # pragma: no cover self.stop() await self._consumer
[docs] def print_keys(self, file=sys.stdout): # pragma: no cover for k, v in self.handlers.items(): doc = getattr(v, '__doc__', getattr(v, '__name__', '')) clean_key = Keys.get_key(k, k) if k == Keys.CTRL_C: continue print(f'{clean_key}: {doc}', file=file)
if __name__ == '__main__': # pragma: no cover main_loop = asyncio.get_event_loop() async def main(): namespace = { 'i': 0 } def hand(msg, stop): namespace['i'] += 1 if namespace['i'] >= 10: stop() print(repr(msg), file=sys.stderr) async with KeyHandler({}, default_handler=hand) as k: k.print_keys() print('Type 10 chars') while not k.stop_event.is_set(): print('.', end='', flush=True) await asyncio.sleep(1) main_loop.run_until_complete(main())