Source code for nuka.tasks.service

# -*- coding: utf-8 -*-
"""
services related tasks
"""
from nuka.task import Task


[docs]class start(Task): """ensure a service is started""" def __init__(self, name=None, **kwargs): super(start, self).__init__(name=name, **kwargs) def do(self): res = self.sh(['service', self.args['name'], 'status'], check=False) if res['rc'] != 0: res = self.sh(['service', self.args['name'], 'start']) else: res['changed'] = False return res def diff(self): res = self.sh(['service', self.args['name'], 'status'], check=False) diff = '' if res['rc'] != 0: diff = self.texts_diff('', self.args['name']) return dict(diff=diff)
[docs]class restart(Task): """restart one or more service""" diff = False def __init__(self, services=None, **kwargs): if isinstance(services, str): services = [services] kwargs.update( name=', '.join(services), services=services) super(restart, self).__init__(**kwargs) def do(self): for service in self.args['services']: res = self.sh(['service', service, 'restart']) return res
[docs]class reload(Task): """reload one or more service""" diff = False def __init__(self, services=None, **kwargs): if isinstance(services, str): services = [services] kwargs.update( name=', '.join(services), services=services) super(reload, self).__init__(**kwargs) def do(self): for service in self.args['services']: res = self.sh(['service', service, 'reload']) return res
[docs]class stop(Task): """ensure a service is stoped""" def __init__(self, name=None, **kwargs): super(stop, self).__init__(name=name, **kwargs) def do(self): return self.sh(['service', self.args['name'], 'stop']) def diff(self): res = self.sh(['service', self.args['name'], 'status'], check=False) if res['rc'] == 0: diff = self.lists_diff(['{0}\n'.format(self.args['name'])], []) else: diff = '' return dict(diff=diff)