Source code for nuka

# -*- coding: utf-8 -*-
"""

nuka.run
========

.. autofunction:: run

nuka.wait
==========

.. autoclass:: wait

nuka.Event
===========

.. autoclass:: Event

nuka.cancel_on_error
====================

.. autofunction:: cancel_on_error
"""
import concurrent
import functools
import logging
import asyncio
import atexit
import signal
import shutil
import sys
import os

import uvloop

# api
from nuka.cli import cli
from nuka import reports
from nuka import process
from nuka import utils  # NOQA / API
from nuka.task import wait  # NOQA / API
from nuka.task import teardown
from nuka.configuration import config

sleep = asyncio.sleep  # NOQA / API


run_vars = {}


[docs]class Event(asyncio.Future): """A named event that you can wait: .. code-block:: python event = Event('myevent') async def do_something(host): nuka.wait(event) """ def __init__(self, name, loop=None): self.name = str(name) super().__init__(loop=loop) self.res = {} def set_result(self, **kwargs): """update Event.res' dict with kwargs and release the event""" self.res.update(kwargs) super().set_result(kwargs) def release(self): """release the event""" self.set_result() def __repr__(self): s = super().__repr__() s = s.replace('<Event ', '<Event {0} '.format(self.name)) return s
[docs]def cancel_on_error(*futures): """Cancel futures when the coroutine raise: .. code-block:: python @cancel_on_error(event) async def do_something(host): event.release() """ def wrapper(func): @functools.wraps(func) async def do(host, *args, **kwargs): try: return await func(host, *args, **kwargs) except asyncio.CancelledError: for f in futures: if not f.done(): f.cancel() raise except: host.log.exception(func) for f in futures: if not f.done(): f.cancel() return do return wrapper
[docs]def run(*coros, timeout=None): """Run coroutines: .. code-block:: python nuka.run(do_something(host) """ # parse args if not already done if cli.args is None: cli.parse_args() # register signal if not already done if 'sigint' not in run_vars: run_vars['sigint'] = 0 loop.add_signal_handler(signal.SIGINT, on_sigint) to_run = [] for coro in coros: host = None try: host = coro.cr_frame.f_locals.get('host') except: pass if host is not None and not host.failed(): host.log('{0}({1})'.format(coro.__name__, host)) to_run.append((host, coro)) coro = asyncio.gather(*[c for h, c in to_run], loop=loop, return_exceptions=True) coro = asyncio.wait_for(coro, loop=loop, timeout=timeout) try: results = loop.run_until_complete(coro) except Exception as e: raise asyncio.CancelledError() else: res_with_exc = [] for (host, coro), res in zip(to_run, results): if isinstance(res, asyncio.CancelledError): if host is not None: if host.failed(): res = host.failed() if not isinstance(res, LookupError): sys.exit(1) elif isinstance(res, Exception): raise res res_with_exc.append(res) return res_with_exc
def run_all(coro, *hosts, **kwargs): return run(*[coro(h) for h in hosts], **kwargs) def on_sigint(*args, **kwargs): hosts = config['all_hosts'].values() run_vars['sigint'] = run_vars['sigint'] + 1 if run_vars['sigint'] == 1: process.close_awaiting_tasks() logging.warning('Exiting...') all_tasks = 0 for host in hosts: tasks = host.running_tasks() if not host.fully_booted.done(): host.log.info('Cancelling {0}...'.format(tasks)) executor.shutdown(wait=False) else: all_tasks += len(tasks) if tasks: host.log.info('Waiting for {0}...'.format(tasks)) # do not use host.cancel() because we don't want to cancel running # tasks host._cancelled = True elif run_vars['sigint'] == 2: logging.warning('Killing remote processes...') for host in hosts: loop.create_task(host.send_messages(dict(signal='SIGINT'))) elif run_vars['sigint'] > 2: logging.warning('Exploding...') executor.shutdown(wait=False) sys.exit(1) def on_exit(): if cli.finalized and not cli.help: if 'all_hosts' in config and 'remote_dir' in config: hosts = config['all_hosts'].values() hosts = [h for h in hosts if h._tasks] coros = [teardown(host=h) for h in hosts if h.loop is loop] if coros: loop.run_until_complete(asyncio.wait(coros)) if hosts: reports.build_reports(hosts) executor.shutdown(wait=True) process.close_connections() loop.close() dirname = config['tmp'] if os.path.isdir(dirname): shutil.rmtree(dirname) if 'exit_message' in run_vars: print(run_vars['exit_message']) # do no use cli to parse this option sinc we need the loop early if '--uvloop' in sys.argv: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) loop = asyncio.get_event_loop() level = logging.WARNING elif '--debug' in sys.argv: loop = asyncio.get_event_loop() loop.set_debug(True) level = logging.DEBUG else: loop = asyncio.get_event_loop() level = logging.WARNING logging.basicConfig(stream=sys.stdout, level=level, format='%(levelname)-5.5s: %(message)s') # explicit executor that we can shutdown gracefully executor = concurrent.futures.ThreadPoolExecutor(5) loop.set_default_executor(executor) if 'TESTING' not in os.environ: # do nothing on exit while unittests atexit.register(on_exit)