Source code for nuka.task

# Copyright 2017 by Bearstech <py@bearstech.com>
#
# This file is part of nuka.
#
# nuka is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nuka is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nuka. If not, see <http://www.gnu.org/licenses/>.

import time
import codecs
import inspect
import asyncio
import logging
import importlib

import asyncssh.misc

from nuka.remote.task import RemoteTask
from nuka.configuration import config
from nuka import remote
from nuka import utils
from nuka import gpg
import nuka


class Base(asyncio.Future):

    def __init__(self, **kwargs):
        self.initialize(**kwargs)
        super().__init__(loop=self.host.loop)
        if self.host.cancelled():
            self.cancel()
        else:
            self.process()

    def initialize(self, host=None,
                   switch_user=None, switch_ssh_user=None, **args):
        meta = {'filename': None, 'lineno': None,
                'start': time.time(), 'times': [],
                'remote_calls': [],
                }
        for infos in inspect.stack(2):
            f = infos.frame
            if isinstance(f.f_locals.get('self'), RemoteTask):
                continue
            if host is None:
                host = f.f_locals.get('host')
            if switch_user is None:
                switch_user = f.f_locals.get('switch_user')
            if switch_ssh_user is None:
                switch_ssh_user = f.f_locals.get('switch_ssh_user')
            if meta['filename'] is None:
                filename = infos.filename
                if filename.endswith('nuka/task.py'):
                    filename = 'nuka/task.py'
                meta.update(filename=filename,
                            lineno=infos.lineno)
                if host is not None:
                    break
        if host is None:  # pragma: no cover
            raise RuntimeError('No valid host found in the stack')
        self.switch_user = switch_user
        self.switch_ssh_user = switch_ssh_user
        self.meta = meta
        self.host = host
        self.loop = self.host.loop
        self.args = args
        self.res = {'changed': True, 'rc': 0}
        self.start = time.time()
        self.run_task = None
        host.add_task(self)

    def running(self):
        """return True if a remote task is running"""
        if self.run_task is not None:
            return not self.run_task.done()
        return False

    def process(self, fut=None):
        if fut is not None:  # pragma: no cover
            # we waited for boot
            self.meta['start'] = time.time()
        start = time.time()
        try:
            self.pre_process()
        except Exception as e:
            self.host.log.exception(e)
            self.cancel()
            raise
        else:
            duration = time.time() - start
            if duration > .05:  # pragma: no cover
                self.host.add_time(
                    start=start, time=duration,
                    type='pre_process', task=self)
            self.run_task = self._loop.create_task(self._run())

    def pre_process(self):
        """run locally before anything is sent to the host"""

    def post_process(self):
        """run when we get a valid reply from the host"""

    def render_template(self, fd):
        """render a template from a file descriptor:

        .. code-block:: python

            {'src': path, 'dst': path}
        """
        src = fd['src']
        ctx = dict(self.args, **self.args.get('ctx', {}))
        ctx.update(host=self.host, env=config, **fd)
        engine = config.get_template_engine()
        template = engine.get_template(src)
        fd['data'] = template.render(ctx)
        if 'executable' not in fd:
            fd['executable'] = utils.isexecutable(src)

    def render_file(self, fd):
        """render a file from a file descriptor. A file descriptor is a dict:

        .. code-block:: python

            {'src': path, 'dst': path}
        """
        src = fd['src']
        if src.endswith('.gpg'):
            _, data = gpg.decrypt(src, 'utf8')
        else:
            with codecs.open(src, 'r', 'utf8') as fd_:
                data = fd_.read()
        fd['data'] = data
        if 'executable' not in fd:
            fd['executable'] = utils.isexecutable(src)

    def log(self):
        self.host.log.info(self)

    def cancel(self):
        """cancel a task"""
        if not self.cancelled():
            super().cancel()
            if not self.res.get('signal') and not self.host.failed():
                # do not log cancellation if the user wanted it
                self.log()
            if self.run_task is not None:
                self.run_task.cancel()
            self.host.cancel()

    async def _run(self):
        # wrap the task to catch exception
        try:
            await self.run()
        except Exception as e:
            self.cancel()
            if not isinstance(e, asyncio.CancelledError):
                self.host.log.exception5(self)

        # update meta
        self.meta.update(self.res.pop('meta', {}))

        # if task succeded then run post_process
        start = time.time()
        try:
            self.post_process()
        except Exception:
            self.cancel()
            self.host.log.exception5(self)
        finally:
            duration = time.time() - start
            if duration > .05:
                self.host.add_time(
                    start=start, time=duration,
                    type='post_process', task=self)

        # set result / log stuff
        if not self.done():
            self.set_result(self)
            self.meta.setdefault('time', time.time() - self.meta['start'])
            self.host.add_time(type='task', task=self, **self.meta)

        # log if not cancelled
        if not self.cancelled():
            self.log()

    def __bool__(self):
        return self.res.get('rc') == 0

    def __repr__(self):
        return '<{0}>'.format(str(self))

    def __str__(self):
        name = self.__class_name__()
        instance_name = self.args.get('name')
        if instance_name is None:
            instance_name = '-'
        s = '{0}({1})'.format(name, instance_name)
        if self.res:
            if self.res['rc'] == 0:
                if self.cancelled():
                    s += ' cancelled at {filename}:{lineno}'.format(
                        **self.meta)
                elif self.done() and getattr(self, 'changed', True):
                    s += ' changed'
            else:
                s += ' fail({0[rc]})'.format(self.res)
            time = self.meta.get('local_time')
            if time:
                s += ' time({0}s)'.format(round(time, 1))
        return s.strip()

    def __class_name__(self):
        klass = self.__class__
        name = '{0}.{1}'.format(klass.__module__.split('.')[-1],
                                klass.__name__)
        return name


[docs]class Task(Base, RemoteTask): def process(self): if self.host.cancelled(): self.cancel() else: diff_mode = self.args.get('diff_mode', nuka.cli.args.diff) if diff_mode: # ignore diff call if the task do not support it attr = getattr(self, 'diff', None) if attr in (None, False): self.res['changed'] = False self.meta['local_time'] = 0. if attr is False: self.host.log.info("{0}.diff is False".format(self)) else: self.host.log.warning("{0}.diff is None".format(self)) self.set_result(self) return if self.host.fully_booted.done(): super().process() else: # use asyncio with callback since we are in a sync __init__ task = self.loop.create_task(wait_for_boot(self.host)) task.add_done_callback(super().process)
[docs] async def run(self): """Serialize the task, send it to the remote host. The remote script will deserialize the task and run :meth:`~nuka.remote.task.Task.do` (or diff() when using --diff) """ self.host.log.debug(self) diff_mode = self.args.get('diff_mode', nuka.cli.args.diff) klass = self.__class__ args = {} for k, v in self.args.items(): if k not in ('ctx',): args[k] = v # prep stdin stdin_data = dict( task=(klass.__module__, klass.__name__), remote_tmp=config['remote_tmp'], switch_user=self.switch_user, args=args, check_mode=False, diff_mode=diff_mode, log_level=config['log']['levels']['remote_level']) if config['testing'] and 'coverage' in self.host.vars: # check if we can/want use coverage cmd = ( '{coverage} run -p ' '--source={remote_dir}/nuka/tasks ' '{script} ' ).format(coverage=self.host.vars['coverage'], **config) else: # use python inventory = self.host.vars.get( 'inventory', {'python': {'executable': 'python'}}) executable = inventory['python'].get('executable', 'python') cmd = '{0} {script} '.format(executable, **config) # allow to trac some ids from ps cmd += '--deploy-id={0} --task-id={1}'.format(config['id'], id(self)) # create process proc = await self.host.create_process( cmd, task=self, switch_user=self.switch_user, switch_ssh_user=self.switch_ssh_user) # send stdin zlib_avalaible = self.host.inventory['python']['zlib_available'] stdin = utils.proto_dumps_std( stdin_data, proc.stdin, content_type=zlib_avalaible and 'zlib' or 'plain') await proc.stdin.drain() res = {} while res.get('message_type') != 'exit': # wait for messages try: res = await proc.next_message() except asyncio.CancelledError: raise except Exception as e: self.cancel() self.host.log.exception5( '{0}\n\n{1}'.format(self, stdin)) else: if res.get('message_type') == 'log': self.host.log.log(res['level'], res['msg']) # finalize self.res.update(res) if self.res['rc'] != 0 and not self.ignore_errors: if not diff_mode: self.cancel()
def log(self): log = self.host.log if 'exc' in self.res: exc = '\n' + ''.join(self.res['exc']) log.error('{0}\n{1}'.format(self, exc)) elif self.res.get('stderr'): if self.res.get('rc') != 0: log.error('{0}\n{1}'.format(self, self.res['stderr'])) elif self.res['changed']: log.changed('{0}\n{1}'.format(self, self.res['stderr'])) else: data = self.res.get('diff', '') if data.strip(): data = data.strip() log.changed('{0} diff=\n{1}\n'.format(self, data)) elif self.cancelled(): log.error(self) elif self.res['changed']: log.changed(self) else: log.info(self) for cmd_ in self.meta.get('remote_calls', []): rtime = round(cmd_['time'], 3) inf = '^ sh({cmd}) time({rtime})'.format( rtime=rtime, **cmd_) if cmd_['exc']: log.error(inf + '\n' + ''.join(cmd_['exc'])) elif nuka.cli.args.verbose > 1: log.info(inf) stds = {k: v for k, v in cmd_.items() if k in ('stderr', 'stdout') and v} if stds: log.debug3('^ ' + str(stds)) data = self.res.get('log') if data: level = None for line in data.rstrip().split('\n'): line = line.rstrip() try: line_level, message = line.split(':', 1) except ValueError: if level: log.log(level, '^ ' + line) else: if line_level in logging._nameToLevel: level = getattr(logging, line_level) log.log(level, '^ ' + message) else: if level: log.log(level, '^ ' + line)
class SetupTask(Base): changed = False def log(self): self.host.log.debug(self) def pre_process(self): self.args['name'] = '' # unamed task def cancel(self): super().cancel() if not self.host.cancelled(): self.host.cancel() self.host.log.critical( 'Cancelled at {filename}:{lineno}...'.format(**self.meta)) class boot(SetupTask): """A task that just call host.boot()""" def __class_name__(self): return 'boot' async def run(self): # wait for boot async try: await self.host.boot() except: self.host.log.exception('boot') self.meta['start'] = self.host._start class setup(SetupTask): """A task that just wait for :class:`~nuka.task.boot` then setup the host""" setup_cmd = ( '{0}rm -Rf {2[remote_tmp]}; ' '{0}mkdir -p {2[remote_tmp]} && {0}chmod 777 {2[remote_tmp]} &&' '{0}mkdir -p {2[remote_dir]} &&' 'dd bs={1} count=1 | {0}tar -xz -C {2[remote_dir]} && ' '{0}`which python 2> /dev/null || which python3 || echo python` ' '{2[script]} --setup' ) def __class_name__(self): return 'setup' async def run(self): host = self.host # wait for boot async await host._named_tasks[boot.__name__] self.meta['start'] = time.time() # run bootstrap_command if any if host.bootstrap_command: res = await host.run_command(host.bootstrap_command) if res['rc'] != 0: self.host.log.error(res) self.cancel() return # setup sudo = '' if host.use_sudo: sudo = '{sudo} '.format(**config) cmd = self.setup_cmd.format(sudo, '{bytes}', config) mods = nuka.config['inventory_modules'][:] mods += self.host.vars.get('inventory_modules', []) if mods: cmd += ' ' + ' '.join(['--inventory=' + m for m in mods]) stdin = remote.build_archive( extra_classes=all_task_classes(), mode='x:gz') c = cmd.format(bytes=len(stdin)) host.log.debug('Uploading archive ({0}kb)...'.format( int(len(stdin) / 1000))) try: proc = await self.host.create_process(c, task=self) proc.stdin.write(stdin) await proc.stdin.drain() except (LookupError, OSError, asyncssh.misc.Error) as e: if isinstance(e, asyncssh.misc.Error): e = LookupError(str(e), self.host) self.host.log.error(e.args[0]) self.host.fail(e) return res = {} while res.get('message_type') != 'exit': # wait for messages try: res = await proc.next_message() except asyncio.CancelledError: raise except (LookupError, OSError) as e: self.host.log.error(e.args[0]) self.host.fail(e) return except Exception as e: self.cancel() self.host.log.exception5( '{0}\n\n{1}'.format(self, stdin)) else: if res.get('message_type') == 'log': self.host.log.log(res['level'], res['msg']) self.res.update(res) if self.res['rc'] != 0: self.cancel() host.vars['inventory'] = self.res['inventory'] for name in mods: mod = importlib.import_module(name) meth = getattr(mod, 'finalize_inventory', None) if meth is not None: meth(host.vars['inventory']) host.log.debug( 'Inventory:\n{0}'.format(host.vars['inventory'])) if not host.fully_booted.done(): host.fully_booted.set_result(True) class teardown(SetupTask): """remove `remote_dir` from the host""" teardown_cmd = '{0}rm -Rf {1[remote_dir]}' def __init__(self, host): host._cancelled = False super().__init__(host=host) def __class_name__(self): return 'teardown' async def run(self): if not self.host.failed(): sudo = self.host.use_sudo and 'sudo ' or '' cmd = self.teardown_cmd.format(sudo, config) await self.host.run_command(cmd, task=self) class destroy(SetupTask): """destroy the host""" def __init__(self, host): host._cancelled = False super().__init__(host=host) def __class_name__(self): return 'destroy' async def run(self): if 'destroyed' not in self.host.vars: await self.host.destroy()
[docs]class wait(Base): """A task that wait for a coroutine / event / future: .. code-block:: python nuka.wait(do_something(host), event) You can use a timeout: .. code-block:: python nuka.wait(event, timeout=30) """ def __class_name__(self): return 'wait' def __init__(self, future, *futures, **kwargs): futures = list(futures) if not isinstance(future, list): futures.insert(0, future) else: futures[0:0] = future kwargs['name'] = repr(futures) kwargs['futures'] = futures super().__init__(**kwargs) async def run(self): futures = self.args['futures'] res = await asyncio.wait_for(asyncio.gather(*futures), timeout=self.args.get('timeout')) self.set_result(res) self.meta.setdefault('time', time.time() - self.meta['start']) # skip time if we dont wait for a nuka.Event events = [e for e in res if isinstance(e, nuka.Event)] if events: self.host.add_time(type='task', task=self, **self.meta)
async def wait_for_boot(host): if not host.fully_booted.done(): create_setup_tasks(host) task = host._named_tasks[setup.__name__] if not task.done(): await task def create_setup_tasks(host): if not host.fully_booted.done(): for task in (boot, setup): instance = host._named_tasks.get(task.__name__) if instance is None: instance = task(host=host) host._named_tasks[task.__name__] = instance host._tasks.append(instance) def get_task_from_stack(): for info in inspect.stack(3): f = info.frame self = f.f_locals.get('self') if isinstance(f.f_locals.get('self'), Base): return self def all_task_classes(cls=Task): for klass in cls.__subclasses__(): yield from all_task_classes(klass) yield klass