Source code for kozmic.builds.tasks

# coding: utf-8
"""
kozmic.builds.tasks
~~~~~~~~~~~~~~~~~~~

.. autofunction:: do_job(hook_call_id)
.. autofunction:: restart_job(id)
"""
import os
import sys
import tempfile
import shutil
import contextlib
import threading
import pipes
import subprocess
import fcntl
import select
import Queue
import socket

import redis
from flask import current_app
from celery.utils.log import get_task_logger
from docker import APIError as DockerAPIError

from kozmic import db, celery, docker
from kozmic.models import Job, HookCall
from kozmic.docker_utils import does_docker_image_exist
from . import get_ansi_to_html_converter


logger = get_task_logger(__name__)


@contextlib.contextmanager
def create_temp_dir():
    build_dir = tempfile.mkdtemp()
    yield build_dir
    shutil.rmtree(build_dir)


[docs]class Publisher(object): """ :param redis_client: Redis client :type log_path: redis.Redis :param channel: pub/sub channel name :type channel: str """ def __init__(self, redis_client, channel): self._redis_client = redis_client self._channel = channel self._ansi_converter = get_ansi_to_html_converter() def publish(self, lines): if isinstance(lines, basestring): lines = [lines] for line in lines: try: line = self._ansi_converter.convert(line, full=False) + '\n' except: pass self._redis_client.publish(self._channel, line) self._redis_client.rpush(self._channel, line) def finish(self): # Remove `channel` key to let `tailer` module # stop listening pubsub channel self._redis_client.delete(self._channel)
[docs]class Tailer(threading.Thread): """A daemon thread that waits for additional lines to be appended to a specified log file. Once there is a new line, it does the following: 1. Translates ANSI sequences to HTML tags; 2. Sends the line to a Redis pub/sub channel; 3. Pushes it to Redis list of the same name. If the log file does not change for ``kill_timeout`` seconds, specified Docker container will be killed and corresponding message will be appended to the log file. Once the thread has finished, :attr:`has_killed_container` tells whether the :param:`container` has stopped by itself or been killed by a timeout. :param log_path: path to the log file to watch :type log_path: str :param publisher: publisher :type publisher: :class:`Publisher` :param container: container to kill :type container: dictionary returned by :meth:`docker.Client.create_container` :param kill_timeout: number of seconds since the last log append after which kill the container :type kill_timeout: int """ daemon = True def __init__(self, log_path, publisher, container, kill_timeout=600): threading.Thread.__init__(self) self._stop = threading.Event() self._log_path = log_path self._publisher = publisher self._container = container self._kill_timeout = kill_timeout self._read_timeout = 0.5 self.has_killed_container = False def stop(self): self._stop.set() def is_stopped(self): return self._stop.isSet() def _kill_container(self): logger.info('Tailer is killing %s', self._container) docker.kill(self._container) self.has_killed_container = True logger.info('%s has been killed.', self._container) def run(self): logger.info('Tailer has started. Log path: %s', self._log_path) tailf = subprocess.Popen(['/usr/bin/tail', '-f', self._log_path], stdout=subprocess.PIPE) try: fl = fcntl.fcntl(tailf.stdout, fcntl.F_GETFL) fcntl.fcntl(tailf.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) buf = '' iterations_without_read = 0 while True: if self.is_stopped(): break reads, _, _ = select.select([tailf.stdout], [], [], self._read_timeout) if not reads: iterations_without_read += 1 if iterations_without_read * self._read_timeout < self._kill_timeout: continue else: self._kill_container() return else: iterations_without_read = 0 buf += tailf.stdout.read() lines = buf.split('\n') if lines[-1] == '': buf = '' else: buf = lines[-1] lines = lines[:-1] self._publisher.publish(lines) finally: tailf.terminate() tailf.wait()
SCRIPT_STARTER_SH = ''' set -x set -e function cleanup {{ # escape # Files created during the build in /kozmic/ folder are owned by root # from the host point of view, because the Docker daemon runs from root. # As a result, Celery worker, which runs as normal user, can not delete # it's temporary folder that is mapped to /kozmic/ container's folder. # To work around this we give everyone write permissions to the # all /kozmic subfolders. # Note: `|| true` to be sure that we will not change # ./script.sh return code by running `chmod` chmod -Rf a+w $(find /kozmic -type d) || true }} # escape trap cleanup EXIT # Add GitHub to known hosts ssh-keyscan -H github.com >> /etc/ssh/ssh_known_hosts if [ -f /kozmic/id_rsa ] && [ -f /kozmic/askpass.sh ]; then # Start ssh-agent service... eval `ssh-agent -s` # ...and add private key to the agent, so we won't be asked # for passphrase during git clone. Let ssh-add read passphrase # by running askpass.sh for the security's sake. SSH_ASKPASS=/kozmic/askpass.sh DISPLAY=:0.0 nohup ssh-add /kozmic/id_rsa rm /kozmic/askpass.sh /kozmic/id_rsa fi git clone {clone_url} /kozmic/src cd /kozmic/src && git checkout -q {commit_sha} chown -R kozmic /kozmic # Redirect stdout to the file being translated to the redis pubsub channel TERM=xterm su kozmic -c "/kozmic/script.sh" &>> /kozmic/script.log '''.strip() ASKPASS_SH = ''' #!/bin/bash if [[ "$1" == *"Bad passphrase, try again"* ]]; then # If we don't exit on "bad passphrase", ssh-add will # never stop calling this script. exit 1 fi echo {passphrase} '''.strip()
[docs]class Builder(threading.Thread): """A thread that starts a script in a container and waits for it to complete. One of the following attributes is not ``None`` once the thread has finished: .. attribute:: return_code Integer, a build script's return code if everything went well. .. attribute:: exc_info ``exc_info`` triple ``(type, value, traceback)`` if something went wrong. :param docker: Docker client :type docker: :class:`docker.Client` :param message_queue: a queue to which put an identifier of the started Docker container. Identifier is a dictionary returned by :meth:`docker.Client.create_container`. Builder will block until the message is acknowledged by calling :meth:`Queue.Queue.task_done`. :type message_queue: :class:`Queue.Queue` :param deploy_key: a pair of strings (private key, passphrase) :type deploy_key: 2-tuple of strings :param docker_image: a name of Docker image to be used for :attr:`build_script` execution. The image has to be already pulled from the registry. :type docker_image: str :param working_dir: path of the directory to be mounted in container's `/kozmic` path :type working_dir: str :param clone_url: SSH clone URL :type clone_url: str :param commit_sha: SHA of the commit to be checked out :type commit_sha: str """ def __init__(self, docker, message_queue, docker_image, script, working_dir, clone_url, commit_sha, deploy_key=None): threading.Thread.__init__(self) self._docker = docker self._message_queue = message_queue self._docker_image = docker_image self._build_script = script self._working_dir = working_dir self._clone_url = clone_url self._commit_sha = commit_sha self._rsa_private_key = None self._passphrase = None if deploy_key: self._rsa_private_key, self._passphrase = deploy_key self.return_code = None self.exc_info = None self.container = None def run(self): try: self.return_code = self._run() except: self.exc_info = sys.exc_info() def _run(self): logger.info('Builder has started.') working_dir_path = lambda f: os.path.join(self._working_dir, f) script_starter_sh_path = working_dir_path('script-starter.sh') script_starter_sh_content = SCRIPT_STARTER_SH.format( clone_url=pipes.quote(self._clone_url), commit_sha=pipes.quote(self._commit_sha)) with open(script_starter_sh_path, 'w') as script_starter_sh: script_starter_sh.write(script_starter_sh_content) script_path = working_dir_path('script.sh') with open(script_path, 'w') as script: script.write(self._build_script) os.chmod(script_path, 0o755) log_path = working_dir_path('script.log') with open(log_path, 'w') as log: log.write('') os.chmod(log_path, 0o664) if self._rsa_private_key and self._passphrase: askpass_sh_path = working_dir_path('askpass.sh') askpass_sh_content = ASKPASS_SH.format( passphrase=pipes.quote(self._passphrase)) with open(askpass_sh_path, 'w') as askpass_sh: askpass_sh.write(askpass_sh_content) os.chmod(askpass_sh_path, 0o100) id_rsa_path = working_dir_path('id_rsa') with open(id_rsa_path, 'w') as id_rsa: id_rsa.write(self._rsa_private_key) os.chmod(id_rsa_path, 0o400) logger.info('Starting Docker process...') self.container = self._docker.create_container( self._docker_image, command='bash /kozmic/script-starter.sh', volumes={'/kozmic': {}}) self._message_queue.put(self.container, block=True, timeout=60) self._message_queue.join() self._docker.start(self.container, binds={self._working_dir: '/kozmic'}) logger.info('Docker process %s has started.', self.container) return_code = self._docker.wait(self.container) try: logs = self._docker.logs(self.container) except socket.timeout: pass else: logger.info('Docker process log: %s', logs) logger.info('Docker process %s has finished with return code %i.', self.container, return_code) logger.info('Builder has finished.') return return_code
@contextlib.contextmanager def _run(publisher, stall_timeout, clone_url, commit_sha, docker_image, script, deploy_key=None, remove_container=True): yielded = False stdout = '' try: with create_temp_dir() as working_dir: message_queue = Queue.Queue() builder = Builder( docker=docker._get_current_object(), # `docker` is a local proxy deploy_key=deploy_key, clone_url=clone_url, commit_sha=commit_sha, docker_image=docker_image, script=script, working_dir=working_dir, message_queue=message_queue) log_path = os.path.join(working_dir, 'script.log') stop_reason = '' try: # Start Builder and wait until it will create the container builder.start() container = message_queue.get(block=True, timeout=60) # Now the container id is known and we can pass it to Tailer tailer = Tailer( log_path=log_path, publisher=publisher, container=container, kill_timeout=stall_timeout) tailer.start() try: # Tell Builder to continue and wait for it to finish message_queue.task_done() builder.join() finally: tailer.stop() if tailer.has_killed_container: stop_reason = '\nSorry, your script has stalled and been killed.\n' finally: if builder.container and remove_container: docker.remove_container(builder.container) if os.path.exists(log_path): with open(log_path, 'r') as log: stdout = log.read() assert ((builder.return_code is not None) ^ (builder.exc_info is not None)) if builder.exc_info: # Re-raise exception happened in builder # (it will be catched in the outer try-except) raise builder.exc_info[1], None, builder.exc_info[2] else: try: yield (builder.return_code, stdout + stop_reason, builder.container) except: raise finally: yielded = True # otherwise we get "generator didn't # stop after throw()" error if nested # code raised exception except: stdout += ('\nSorry, something went wrong. We are notified of ' 'the issue and will fix it soon.') if not yielded: yield 1, stdout, None raise class RestartError(Exception): pass @celery.task def restart_job(id): """A Celery task that restarts a job. :param id: int, :class:`Job` identifier """ job = Job.query.get(id) assert 'Job#{} does not exist.'.format(id) if not job.is_finished(): raise RestartError('Tried to restart %r which is not finished.', job) db.session.delete(job) # Run do_job task synchronously: do_job.apply(args=(job.hook_call_id,)) @celery.task def do_job(hook_call_id): """A Celery task that does a job specified by a hook call. Creates a :class:`Job` instance and executes a build script prescribed by a triggered :class:`Hook`. Also sends job output to :attr:`Job.task_uuid` Redis pub-sub channel and updates build status. :param hook_call_id: int, :class:`HookCall` identifier """ hook_call = HookCall.query.get(hook_call_id) assert hook_call, 'HookCall#{} does not exist.'.format(hook_call_id) job = Job( build=hook_call.build, hook_call=hook_call, task_uuid=do_job.request.id) db.session.add(job) job.started() db.session.commit() hook = hook_call.hook project = hook.project config = current_app.config redis_client = redis.StrictRedis(host=config['KOZMIC_REDIS_HOST'], port=config['KOZMIC_REDIS_PORT'], db=config['KOZMIC_REDIS_DATABASE']) publisher = Publisher(redis_client=redis_client, channel=job.task_uuid) stdout = '' try: kwargs = dict( publisher=publisher, stall_timeout=config['KOZMIC_STALL_TIMEOUT'], clone_url=(project.gh_https_clone_url if project.is_public else project.gh_ssh_clone_url), commit_sha=hook_call.build.gh_commit_sha) message = 'Pulling "{}" Docker image...'.format(hook.docker_image) logger.info(message) publisher.publish(message) stdout = message + '\n' try: docker.pull(hook.docker_image) # Make sure that image has been successfully pulled by calling # `inspect_image` on it: docker.inspect_image(hook.docker_image) except DockerAPIError as e: logger.info('Failed to pull %s: %s.', hook.docker_image, e) job.finished(1) job.stdout = str(e) db.session.commit() return else: logger.info('%s image has been pulled.', hook.docker_image) if not project.is_public: project.deploy_key.ensure() if not project.is_public: kwargs['deploy_key'] = ( project.deploy_key.rsa_private_key, project.passphrase) if job.hook_call.hook.install_script: cached_image = 'kozmic-cache/{}'.format(job.get_cache_id()) cached_image_tag = str(project.id) if does_docker_image_exist(cached_image, cached_image_tag): install_stdout = ('Skipping install script as tracked files ' 'did not change...') publisher.publish(install_stdout) stdout += install_stdout + '\n' else: with _run(docker_image=hook.docker_image, script=hook.install_script, remove_container=False, **kwargs) as (return_code, install_stdout, container): stdout += install_stdout if return_code == 0: # Install script has finished successfully. So we # promote the resulting container to an image that # we will use for running the build script in # this and consequent jobs docker.commit(container['Id'], repository=cached_image, tag=cached_image_tag) docker.remove_container(container) else: job.finished(return_code) job.stdout = stdout db.session.commit() return assert docker.images(cached_image) docker_image = cached_image + ':' + cached_image_tag else: docker_image = job.hook_call.hook.docker_image with _run(docker_image=docker_image, script=hook.build_script, remove_container=True, **kwargs) as (return_code, build_stdout, container): job.finished(return_code) job.stdout = stdout + build_stdout db.session.commit() return finally: publisher.finish()