Source code for nuka.hosts.cloud

# -*- coding: utf-8 -*-
from collections import defaultdict
from functools import partial
import ipaddress
import threading
import asyncio
import codecs
import time
import os

from libcloud.common.google import ResourceNotFoundError
from libcloud.compute.providers import get_driver
from libcloud.compute.types import Provider

from novaclient.client import Client as NovaClient
from novaclient.exceptions import NotFound

from nuka.task import get_task_from_stack
from nuka.hosts import base
from nuka import utils
import nuka


nova_providers = (Provider.OPENSTACK,)


_drivers = defaultdict(dict)

_env_keys = {
    Provider.GCE: (
        ('GCE_EMAIL', 'user_id'),
        ('GCE_PEM_FILE_PATH', 'key'),
        ('GCE_PROJECT', 'project'),
    ),
    Provider.OPENSTACK: (
        ('OS_VERSION', '2.0'),
        ('OS_AUTH_URL', 'auth_url'),
        ('OS_USERNAME', 'username'),
        ('OS_PASSWORD', 'password'),
        ('OS_PROJECT_NAME', 'project_name'),
        ('OS_PROJECT_ID', 'project_id'),
        ('OS_TENANT_NAME', 'project_name'),
        ('OS_TENANT_ID', 'project_id'),
        ('OS_REGION_NAME', 'region_name'),
    )
}


def driver_from_config(provider, **args):
    # cache one driver per thread. probably only usefull for main thread
    ident = threading.get_ident()
    driver = _drivers[provider].get(ident)
    if driver is None:
        if provider in nova_providers:
            klass = NovaClient
        else:
            klass = get_driver(provider)

        if not args:
            args = nuka.config[provider.lower()].get('driver', {}) or {}
            if isinstance(args, str):
                # load arguments from file
                with open(os.path.expanduser(args)) as fd:
                    args = utils.json.load(fd)

            for k, new_k in _env_keys.get(provider, ()):
                if k in os.environ:
                    v = os.environ[k]
                    args[new_k] = v
            if not args:
                raise RuntimeError((
                    'Not able to get the driver configuration for {} provider.'
                ).format(provider))

        driver = klass(**args)
        _drivers[ident] = driver
    return driver


class Host(base.Host):
    """Host in the cloud"""

    use_sudo = True

    def __init__(self, hostname, node=None,
                 create=True, driver_args=None, create_node_args=None, **vars):
        config = nuka.config.get(self.provider.lower(), {})
        user = config.get('user') or 'root'
        vars.setdefault('user', user)
        vars['driver_args'] = driver_args or {}
        vars['create_node_args'] = create_node_args or {}
        super().__init__(hostname, **vars)
        self.create = create
        self._node = node

    async def boot(self):
        # we need to get the task from here.
        # we cant retrieve it while in an executor
        task = get_task_from_stack()
        get_node = partial(self.get_node, task=task)
        try:
            self._node = await self.loop.run_in_executor(None, get_node)
        except asyncio.CancelledError:
            raise
        except Exception as e:
            print(type(e))
            self.log.exception('boot')
            self.cancel()
            raise
        else:
            self.hostname = self.public_ip

    @property
    def public_ip(self):
        """return host's public ip"""
        return self.node.public_ips[0]

    @property
    def private_ip(self):
        """return host's private ip"""
        return self.node.private_ips[0]

    def get_node(self, task=None, **kwargs):
        if self._node is False:
            raise RuntimeError('Node {0} was destroyed'.format(self))
        elif self._node is None:

            with self.timeit(type='api_call', task=task,
                             name='Driver.from_config()'):
                driver = driver_from_config(provider=self.provider,
                                            **self.driver_args)

            with self.timeit(type='api_call', task=task,
                             name='driver.ex_get_node()'):
                try:
                    if self.provider in nova_providers:
                        self._node = driver.servers.find(name=self.hostname)
                    else:
                        self._node = driver.ex_get_node(self.hostname)
                except (ResourceNotFoundError, NotFound):
                    pass
            if self._node is None and self.create:
                self.create_node(driver=driver, task=task)
        return self._node
    node = property(get_node)

    def create_node(self, driver=None, task=None, **kwargs):
        self.log.warning(
            'Node {0} does not exist. Creating...'.format(self))
        if driver is None:
            driver = driver_from_config(provider=self.provider,
                                        **self.driver_args)
        args = self.get_create_node_args(driver=driver, task=task)
        args['name'] = self.name
        with self.timeit(type='api_call', task=task,
                         name='driver.ex_create_node()'):
            if self.provider in nova_providers:
                node = driver.servers.create(**args)
                # it take at least 20s to boot a vm
                time.sleep(20)
                wait = 15
                while node.status not in ('ACTIVE',):
                    try:
                        node = driver.servers.find(name=self.hostname)
                    except NotFound:
                        pass
                    if node.status in ('ACTIVE',):
                        # need to wait ~10s to get the server up & running
                        time.sleep(10)
                        break
                    else:
                        wait = (wait - 5)
                        if wait < 1:
                            wait = 2
                        time.sleep(wait)
                self._node = node
            else:
                self._node = driver.create_node(**args)
            self.log.warning('Node {0} created'.format(self))

    def get_create_node_args(self, driver=None, task=None):
        provider_args = nuka.config.get(self.provider.lower(), {})
        node_args = provider_args.get('create_node_args', {}).copy()
        node_args.update(self.vars.get('create_node_args'))
        self.log.debug4(nuka.utils.json.dumps(node_args, indent=2))
        return node_args

    async def destroy(self, **kwargs):
        raise NotImplementedError(
            'Please use Cloud.destroy() which is mush faster')


class GCEHost(Host):
    """Host on GCE"""

    use_sudo = True
    provider = Provider.GCE

    def get_create_node_args(self, driver=None, task=None):
        node_args = super().get_create_node_args(driver=driver, task=task)

        # update ex_metadata ssh keys
        keys_list = nuka.config.get('ssh', {}).get('keys') or []
        keysfile = nuka.config.get('ssh', {}).get('keysfile')
        if keysfile:
            with codecs.open(os.path.expanduser(keysfile), 'r', 'utf8') as fd:
                for line in fd:
                    keys_list.append(line.strip())
        if keys_list:
            node_args.setdefault('ex_metadata', {})
            items = node_args['ex_metadata'].setdefault('items', [])
            if not [v for v in items if v["key"] == "sshKeys"]:
                keys = []
                for line in keys_list:
                    if ':' in line:
                        keys.append(line)
                    else:
                        keys.append(self.vars['user'] + ':' + line)
                keys = '\n'.join(keys)
                items.append({"value": keys, "key": "sshKeys"})

        self.log.debug4(nuka.utils.json.dumps(node_args, indent=2))

        if 'ex_network' not in node_args:
            with self.timeit(type='api_call', task=task,
                             name='Driver.ex_list_networks()'):
                net = driver.ex_list_networks()[0]
            node_args['ex_network'] = net

        return node_args


class OpenstackHost(Host):
    """Host on OpenStack"""

    use_sudo = True
    provider = Provider.OPENSTACK

    def _set_ips(self):
        for iface in self.node.interface_list():
            for addr in iface.fixed_ips:
                ip = addr['ip_address']
                try:
                    a = ipaddress.IPv4Address(ip)
                except ValueError:
                    continue
                else:
                    if not a.is_private:
                        if 'public_ip' not in self.vars:
                            self.vars['public_ip'] = ip
                    else:
                        if 'private_ip' not in self.vars:
                            self.vars['private_ip'] = ip
        if 'private_ip' not in self.vars:
            # we may not have a private net
            self.vars['private_ip'] = self.vars['public_ip']

    @property
    def public_ip(self):
        """return host's public ip"""
        if 'public_ip' not in self.vars:
            self._set_ips()
        return self.vars['public_ip']

    @property
    def private_ip(self):
        if 'private_ip' not in self.vars:
            self._set_ips()
        return self.vars['private_ip']

    def get_create_node_args(self, driver=None, task=None):
        node_args = super().get_create_node_args(driver=driver, task=task)

        # update ex_metadata ssh keys
        keys_list = nuka.config.get('ssh', {}).get('keys') or []
        keysfile = nuka.config.get('ssh', {}).get('keysfile')
        if keysfile:
            with codecs.open(os.path.expanduser(keysfile), 'r', 'utf8') as fd:
                for line in fd:
                    keys_list.append(line.strip())
        if keys_list:
            driver = driver_from_config(self.provider, **self.driver_args)
            keypairs = [k for k in driver.keypairs.list() if k.name == 'ops']
            if not keypairs:
                driver.keypairs.create(
                    name='ops',
                    public_key='\n'.join(keys_list),
                )
            node_args['key_name'] = 'ops'
        self.log.debug4(nuka.utils.json.dumps(node_args, indent=2))
        if 'flavor' in node_args:
            node_args['flavor'] = driver.flavors.find(name=node_args['flavor'])
        if 'image' in node_args:
            node_args['image'] = driver.glance.find_image(node_args['image'])
        return node_args


[docs]class Cloud(base.HostGroup): """A HostGroup of cloud hosts""" _list_lock = threading.Lock() _nodes = {} host_classes = { Provider.GCE: GCEHost, Provider.OPENSTACK: OpenstackHost, } def __init__(self, provider=None, driver_args=None, create=True, use_sudo=False): if provider is None: raise RuntimeError('A cloud instance require a valid provider') self.provider = provider self.driver_args = driver_args or {} self.create = create if provider in self.host_classes: self.host_class = self.host_classes[self.provider] else: name = '{0}Host'.format(self.provider.title()) args = {'provider': self.provider, 'use_sudo': use_sudo} self.host_class = type(name, (Host,), args) self._nodes = {} @property def driver(self): return driver_from_config(self.provider, **self.driver_args) @property def cached_list_nodes(self): cls = self.__class__ if not self._nodes: cls._list_lock.acquire() if self.provider in nova_providers: self._nodes = {n.name: n for n in self.driver.servers.list()} else: self._nodes = {n.name: n for n in self.driver.list_nodes()} cls._list_lock.release() return self._nodes def __getitem__(self, item): if item not in self: item = item.replace('_', '-') return super().__getitem__(item)
[docs] def get_or_create_node(self, hostname, **kwargs): """Return a Host. Create it if needed""" kwargs.setdefault('create', self.create) kwargs['driver_args'] = self.driver_args if hostname not in self: kwargs.setdefault('node', self._nodes.get(hostname)) self[hostname] = self.host_class(hostname=hostname, **kwargs) return self[hostname]
[docs] def get_node(self, hostname, **kwargs): """Return a Host. Create it if needed""" kwargs['create'] = False host_node = self._nodes.get(hostname) return self.get_or_create_node(hostname, node=host_node, **kwargs)
[docs] def from_compose(self, project_name=None, filename='docker-compose.yml'): """Return a host group with hosts names extracted from compose file""" from compose.project import Project from compose import config project_dir = os.getcwd() project_name = project_name or os.path.basename(project_dir) conf = config.load( config.find(project_dir, [filename], os.environ) ) project = Project.from_config('', conf, client=None) for service in project.get_services(): name = '{0}-{1.name}-1'.format(project_name, service) self.get_or_create_node(hostname=name) return self
[docs] async def destroy(self): """Destroy all hosts in the group""" nodes = [] for node in self.cached_list_nodes.values(): if node.name in self: nodes.append(node) self[node.name].log.info('destroy()') if nodes: if self.provider in nova_providers: for node in nodes: try: node.force_delete() except: if node.name in self: host = self[node.name] host.log.exception('destroy()') else: raise else: self.driver.ex_destroy_multiple_nodes(nodes) del self[node.name]
def get_cloud(provider=Provider.GCE): """return a :class:`nuka.hosts.Cloud` filled with all instanciated hosts""" cloud = Cloud(provider=provider) for node in cloud.cached_list_nodes.values(): cloud.get_or_create_node(node.name, node=node) return cloud