Source code for precept._executor

import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor


[docs]class AsyncExecutor: """Execute functions in a Pool Executor"""
[docs] def __init__(self, loop=None, executor=None, max_workers=None): """ :param loop: asyncio event loop. :param executor: Set to use an already existing PoolExecutor, default to a new ThreadPoolExecutor if not supplied. :param max_workers: Max workers of the created ThreadPoolExecutor. """ self.loop = loop or asyncio.get_event_loop() self.lock = asyncio.Lock() if executor: # pragma: no cover self.executor = executor else: # FIXME Investigate need to call shutdown on executor. # pylint: disable=consider-using-with self.executor = ThreadPoolExecutor(max_workers=max_workers)
[docs] async def execute(self, func, *args, **kwargs): """ Execute a sync function asynchronously in the executor. :param func: Synchronous function. :param args: Argument to give to the function. :param kwargs: Keyword arguments to give to the function :return: """ return await self.loop.run_in_executor( self.executor, functools.partial(func, *args, **kwargs) )
[docs] async def execute_with_lock(self, func, *args, **kwargs): # pragma: no cover # noqa: E501 """ Acquire lock before executing the function. :param func: Synchronous function. :param args: :param kwargs: :return: """ await self.lock.acquire() ret = await self.execute(func, *args, **kwargs) self.lock.release() return ret
[docs] def wraps(self, func): """ Wraps a synchronous function to execute in the pool when called, making it async. :param func: The function to wraps :return: Async wrapped function. """ @functools.wraps(func) async def wrapper(*args, **kwargs): return await self.execute(func, *args, **kwargs) return wrapper