Source code for nuka.tasks.user

# -*- coding: utf-8 -*-
"""
unix user related tasks
"""
import os
from nuka import utils
from nuka.task import Task
from nuka.tasks import file


[docs]class delete_user(Task): """remove a unix account""" def __init__(self, username=None, **kwargs): kwargs.setdefault('name', username) super(delete_user, self).__init__(username=username, **kwargs) def do(self): username = self.args['username'] if not self.sh(['id', username], check=False)['rc']: return self.sh(['userdel', '-r', username]) return dict(rc=0, username=username) def diff(self): username = self.args['username'] diff = '' if not self.sh(['id', username], check=False)['rc']: diff = self.texts_diff(username, '') return dict(rc=0, diff=diff, username=username)
[docs]class create_user(Task): """create a unix account""" home_format = '/home/{0}' gecos_format = 'user {0}' def __init__(self, username=None, **kwargs): kwargs.setdefault('name', username) kwargs.setdefault('gecos', self.gecos_format.format(username)) kwargs.setdefault('home', self.home_format.format(username)) super(create_user, self).__init__(username=username, **kwargs) def do(self): username = self.args['username'] home = self.args['home'] gecos = self.args['gecos'] if self.sh(['id', username], check=False)['rc']: if self.is_debian: # ensure adduser is installed (wheezy do not have it) res = self.sh(['which', 'adduser'], check=False) if res['rc'] != 0: # pragma: no cover raise OSError('adduser is not available') cmd = [ 'adduser', '--quiet', '--disabled-password', '--gecos', gecos, '--home', home, username] else: cmd = [ 'useradd', '-p', '\*', '-c', gecos, '-m', '--home-dir', home, username] res = self.sh(cmd) else: res = dict(rc=0, changed=False) res.update(username=username, home=home) return res def diff(self): username = self.args['username'] home = self.args['home'] diff = '' if self.sh(['id', username], check=False)['rc'] != 0: diff = self.texts_diff('', username) if not os.path.isdir(home): diff += self.texts_diff('', home) res = dict(rc=0, diff=diff, **self.args) res.update(username=username, home=self.args['home']) return res
[docs]class create_www_user(create_user): """create a unix account with /var/www/{username} as home""" home_format = '/var/www/{0}' gecos_format = 'www user {0}'
[docs]class authorized_keys(file.put): """upload ssh keys from string or file""" def __init__(self, username=None, keys=None, keysfile=None, **kwargs): kwargs.update(name=username, username=username, switch_user=username) if 'files' not in kwargs: dst = '~/.ssh/authorized_keys' if keysfile: kwargs['files'] = [dict(src=keysfile, dst=dst, mod='644')] elif keys: kwargs['files'] = [dict(data=keys, dst=dst, mod='644')] else: raise RuntimeError('You must provide keys or keysfile') super(authorized_keys, self).__init__(**kwargs) def do(self): utils.makedirs(os.path.expanduser('~/.ssh'), mod='700') super(authorized_keys, self).do()