diff --git a/cleanup-koji-stg.sh b/cleanup-koji-stg.sh new file mode 100755 index 00000000..306869ae --- /dev/null +++ b/cleanup-koji-stg.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +# default's *42* lkocman's *43 as well + +for mvr in testmodule-4.3.43-1 testmodule-4.3.42-1; do + koji --config /etc/rida/koji.conf remove-target $mvr + koji --config /etc/rida/koji.conf remove-tag $mvr + koji --config /etc/rida/koji.conf remove-tag $mvr-build +done diff --git a/requirements.txt b/requirements.txt index c7c2581a..d8815d8a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ fedmsg modulemd pyOpenSSL kobo +munch diff --git a/rida.conf b/rida.conf index 94e0a1e4..a50b61e9 100644 --- a/rida.conf +++ b/rida.conf @@ -2,9 +2,11 @@ system = koji messaging = fedmsg koji_config = /etc/rida/koji.conf +# See https://fedoraproject.org/wiki/Koji/WritingKojiCode#Profiles koji_profile = koji +koji_arches = ["x86_64"] db = sqlite:///rida.db -pdc_url = http://modularity.fedorainfracloud.org:8080/rest_api/v1 +pdc_url = http://modularity.fedorainfracloud.org:8080/rest_api/v1/ pdc_insecure = True pdc_develop = True scmurls = ["git://pkgs.stg.fedoraproject.org/modules/"] @@ -17,9 +19,9 @@ port = 5000 # Set to zero to disable polling polling_interval = 60 -rpms_default_repository = git://pkgs.fedoraproject.org/rpms/ +rpms_default_repository = git://pkgs.stg.fedoraproject.org/rpms/ rpms_allow_repository = False -rpms_default_cache = http://pkgs.fedoraproject.org/repo/pkgs/ +rpms_default_cache = http://pkgs.stg.fedoraproject.org/repo/pkgs/ rpms_allow_cache = False ssl_enabled = True @@ -30,7 +32,7 @@ ssl_ca_certificate_file = cacert.pem pkgdb_api_url = https://admin.stg.fedoraproject.org/pkgdb/api # Available backends are: console, file, journal. -log_backend = journal +log_backend = console # Path to log file when log_backend is set to "file". log_file = rida.log diff --git a/rida.py b/rida.py index f1c8c4c0..ac82aca3 100755 --- a/rida.py +++ b/rida.py @@ -1,7 +1,5 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- - - # Copyright (c) 2016 Red Hat, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -30,6 +28,7 @@ This is the implementation of the orchestrator's public RESTful API. """ from flask import Flask, request +import flask import json import logging import modulemd @@ -38,9 +37,9 @@ import rida.auth import rida.config import rida.database import rida.logger -import rida.messaging import rida.scm import ssl +import shutil import tempfile app = Flask(__name__) @@ -53,6 +52,8 @@ else: conf = rida.config.from_file("rida.conf") rida.logger.init_logging(conf) +log = logging.getLogger(__name__) + db = rida.database.Database(conf) @app.route("/rida/module-builds/", methods=["POST"]) @@ -80,13 +81,12 @@ def submit_build(): return "The submitted scmurl isn't allowed", 403 yaml = str() try: + td = tempfile.mkdtemp() scm = rida.scm.SCM(url, conf.scmurls) - td = tempfile.TemporaryDirectory() - cod = scm.checkout(td.name) + cod = scm.checkout(td) cofn = os.path.join(cod, (scm.name + ".yaml")) with open(cofn, "r") as mmdfile: yaml = mmdfile.read() - td.cleanup() except Exception as e: if "is not in the list of allowed SCMs" in str(e): rc = 403 @@ -95,6 +95,8 @@ def submit_build(): else: rc = 500 return str(e), rc + finally: + shutil.rmtree(td) mmd = modulemd.ModuleMetadata() try: mmd.loads(yaml) @@ -103,15 +105,28 @@ def submit_build(): if db.session.query(rida.database.ModuleBuild).filter_by(name=mmd.name, version=mmd.version, release=mmd.release).first(): return "Module already exists", 409 - module = rida.database.ModuleBuild(name=mmd.name, version=mmd.version, - release=mmd.release, state="init", modulemd=yaml) - db.session.add(module) - db.session.commit() + module = rida.database.ModuleBuild.create( + db.session, + conf, + name=mmd.name, + version=mmd.version, + release=mmd.release, + modulemd=yaml, + ) + + def failure(message, code): + # TODO, we should make some note of why it failed in the db.. + log.exception(message) + module.transition(conf, rida.database.BUILD_STATES["failed"]) + db.session.add(module) + db.session.commit() + return message, code + for pkgname, pkg in mmd.components.rpms.packages.items(): if pkg.get("repository") and not conf.rpms_allow_repository: - return "Custom component repositories aren't allowed", 403 + return failure("Custom component repositories aren't allowed", 403) if pkg.get("cache") and not conf.rpms_allow_cache: - return "Custom component caches aren't allowed", 403 + return failure("Custom component caches aren't allowed", 403) if not pkg.get("repository"): pkg["repository"] = conf.rpms_default_repository + pkgname if not pkg.get("cache"): @@ -120,32 +135,30 @@ def submit_build(): try: pkg["commit"] = rida.scm.SCM(pkg["repository"]).get_latest() except Exception as e: - return "Failed to get the latest commit: %s" % pkgname, 422 - if not rida.scm.SCM(pkg["repository"] + "?#" + pkg["commit"]).is_available(): - return "Cannot checkout %s" % pkgname, 422 - build = rida.database.ComponentBuild(module_id=module.id, package=pkgname, format="rpms") + return failure("Failed to get the latest commit: %s" % pkgname, 422) + full_url = pkg["repository"] + "?#" + pkg["commit"] + if not rida.scm.SCM(full_url).is_available(): + return failure("Cannot checkout %s" % pkgname, 422) + build = rida.database.ComponentBuild( + module_id=module.id, + package=pkgname, + format="rpms", + scmurl=full_url, + ) db.session.add(build) module.modulemd = mmd.dumps() - module.state = rida.database.BUILD_STATES["wait"] + module.transition(conf, rida.database.BUILD_STATES["wait"]) db.session.add(module) db.session.commit() - # Publish to whatever bus we're configured to connect to. - # This should notify ridad to start doing the work we just scheduled. - rida.messaging.publish( - modname='rida', - topic='module.state.change', - msg=module.json(), - backend=conf.messaging, - ) logging.info("%s submitted build of %s-%s-%s", username, mmd.name, mmd.version, mmd.release) - return json.dumps(module.json()), 201 + return flask.jsonify(module.json()), 201 @app.route("/rida/module-builds/", methods=["GET"]) def query_builds(): """Lists all tracked module builds.""" - return json.dumps([{"id": x.id, "state": x.state} + return flask.jsonify([{"id": x.id, "state": x.state} for x in db.session.query(rida.database.ModuleBuild).all()]), 200 @@ -158,8 +171,8 @@ def query_build(id): if module.state != "init": for build in db.session.query(rida.database.ComponentBuild).filter_by(module_id=id).all(): tasks[build.format + "/" + build.package] = \ - str(build.task) + "/" + build.state - return json.dumps({ + str(build.task_id) + "/" + build.state + return flask.jsonify({ "id": module.id, "state": module.state, "tasks": tasks diff --git a/rida/builder.py b/rida/builder.py index 3d267df5..578d8175 100644 --- a/rida/builder.py +++ b/rida/builder.py @@ -1,6 +1,4 @@ # -*- coding: utf-8 -*- - - # Copyright (c) 2016 Red Hat, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -30,9 +28,25 @@ # their tag names. # TODO: Ensure the RPM %dist tag is set according to the policy. -import koji from abc import ABCMeta, abstractmethod +import logging +import os + from kobo.shortcuts import run +import koji +import tempfile +import glob +import datetime +import time +import random +import string +import kobo.rpmlib + +import munch +from OpenSSL.SSL import SysCallError + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger(__name__) # TODO: read defaults from rida's config KOJI_DEFAULT_GROUPS = { @@ -93,6 +107,14 @@ class GenericBuilder: """ raise NotImplementedError() + @abstractmethod + def buildroot_ready(self, artifacts=None): + """ + :param artifacts=None : a list of artifacts supposed to be in buildroot + return when buildroot is ready (or contain specified artifact) + """ + raise NotImplementedError() + @abstractmethod def buildroot_add_dependency(self, dependencies): """ @@ -102,25 +124,21 @@ class GenericBuilder: raise NotImplementedError() @abstractmethod - def buildroot_add_artifacts(self, artifacts): + def buildroot_add_artifacts(self, artifacts, install=False): """ :param artifacts: list of artifacts to be available in buildroot + :param install=False: pre-install artifact in buildroot (otherwise "make it available for install") add artifacts into buildroot, can be used to override buildroot macros """ raise NotImplementedError() - @abstractmethod - def buildroot_ready(self, artifact=None): - """ - :param artifact=None: wait for specific artifact to be present - waits for buildroot to be ready and contain given artifact - """ - raise NotImplementedError() - @abstractmethod def build(self, artifact_name, source): """ - :param artifact_name : name of what are we building (used for whitelists) + :param artifact_name : a crucial, since we can't guess a valid srpm name + without having the exact buildroot (collections/macros) + used e.g. for whitelisting packages + artifact_name is used to distinguish from artifact (e.g. package x nvr) :param source : a scmurl to repository with receipt (e.g. spec) """ raise NotImplementedError() @@ -128,119 +146,303 @@ class GenericBuilder: class Builder: """Wrapper class""" - def __new__(cls, module, backend, config): + def __new__(cls, module, backend, config, **extra): """ :param module : a module string e.g. 'testmodule-1.0' :param backend: a string representing backend e.g. 'koji' :param config: instance of rida.config.Config + + Any additional arguments are optional extras which can be passed along + and are implementation-dependent. """ if backend == "koji": - return KojiModuleBuilder(module=module, config=config) + return KojiModuleBuilder(module=module, config=config, **extra) else: raise ValueError("Builder backend='%s' not recognized" % backend) +def retry(callback, **kwargs): + attempt = 0 + log.debug("retry() calling %r(kwargs=%r)" % (callback, kwargs)) + while True: + try: + callback(**kwargs) + break + except SysCallError: + attempt += 1 + log.warn("retry(attempt=%d) calling %r(kwargs=%r)" % (attempt, callback, kwargs)) + class KojiModuleBuilder(GenericBuilder): """ Koji specific builder class """ backend = "koji" - def __init__(self, module, config): + def __init__(self, module, config, tag_name): """ - :param koji_profile: koji profile to be used + :param module: string representing module + :param config: rida.config.Config instance + :param tag_name: name of tag for given module """ self.module_str = module + self.tag_name = tag_name self.__prep = False - self._koji_profile_name = config.koji_profile - self.koji_module = koji.get_profile_module(self._koji_profile_name) - opts = {} - - krbservice = getattr(self.koji_module.config, "krbservice", None) - if krbservice: - opts["krbservice"] = krbservice - - self.koji_session = koji.ClientSession(self.koji_module.config.server, opts=opts) - - if self.koji_module.config.authtype == "kerberos": - keytab = getattr(self.koji_module.config, "keytab", None) - principal = getattr(self.koji_module.config, "principal", None) - if keytab and principal: - self.koji_session.krb_login(principal=principal, keytab=keytab, proxyuser=None) - else: - self.koji_session.krb_login() - - elif self.koji_module.config.authtype == "ssl": - self.koji_session.ssl_login(self.koji_module.config.cert, None, self.koji_module.serverca, proxyuser=None) + log.debug("Using koji profile %r" % config.koji_profile) + log.debug ("Using koji_config: %s" % config.koji_config) + self.koji_session, self.koji_module = self.get_session_from_config(config) self.arches = config.koji_arches + if not self.arches: + raise ValueError("No koji_arches specified in the config.") - self.module_tag = self._get_module_tag_name() - self.module_build_tag = self._get_module_build_tag_name() - self.module_target = self._get_module_target_name() + # These eventually get populated when buildroot_{prep,resume} is called + self.module_tag = None # string + self.module_build_tag = None # string + self.module_target = None # A koji target dict + + def __repr__(self): + return "" % ( + self.module_str, self.tag_name) + + def buildroot_ready(self, artifacts=None): + assert self.module_target, "Invalid build target" + + timeout = 120 # minutes see * 60 + tag_id = self.module_target['build_tag'] + start = time.time() + last_repo = None + repo = self.koji_session.getRepo(tag_id) + builds = [ self.koji_session.getBuild(a) for a in artifacts or []] + + while True: + if builds and repo and repo != last_repo: + if koji.util.checkForBuilds(self.koji_session, tag_id, builds, repo['create_event'], latest=True): + return + + if (time.time() - start) >= (timeout * 60.0): + return 1 + + time.sleep(60) + last_repo = repo + repo = self.koji_session.getRepo(tag_id) + + if not builds: + if repo != last_repo: + return + + @staticmethod + def get_disttag_srpm(disttag): + + #Taken from Karsten's create-distmacro-pkg.sh + # - however removed any provides to system-release/redhat-release + + name = 'module-build-macros' + version = "0.1" + release = "1" + today = datetime.date.today().strftime('%a %b %d %Y') + + spec_content = """%global dist {disttag} +Name: {name} +Version: {version} +Release: {release}%dist +Summary: Package containing macros required to build generic module +BuildArch: noarch + +Group: System Environment/Base +License: MIT +URL: http://fedoraproject.org + +%description +This package is used for building modules with a different dist tag. +It provides a file /usr/lib/rpm/macros.d/macro.modules and gets read +after macro.dist, thus overwriting macros of macro.dist like %%dist +It should NEVER be installed on any system as it will really mess up + updates, builds, .... + + +%build + +%install +mkdir -p %buildroot/%_rpmconfigdir/macros.d 2>/dev/null |: +echo %%dist %dist > %buildroot/%_rpmconfigdir/macros.d/macros.modules +chmod 644 %buildroot/%_rpmconfigdir/macros.d/macros.modules + + +%files +%_rpmconfigdir/macros.d/macros.modules + + + +%changelog +* {today} Fedora-Modularity - {version}-{release}{disttag} +- autogenerated macro by Rida "The Orchestrator" +""".format(disttag=disttag, today=today, name=name, version=version, release=release) + td = tempfile.mkdtemp(prefix="rida-build-macros") + fd = open(os.path.join(td, "%s.spec" % name), "w") + fd.write(spec_content) + fd.close() + log.debug("Building %s.spec" % name) + ret, out = run('rpmbuild -bs %s.spec --define "_topdir %s"' % (name, td), workdir=td) + sdir = os.path.join(td, "SRPMS") + srpm_paths = glob.glob("%s/*.src.rpm" % sdir) + assert len(srpm_paths) == 1, "Expected exactly 1 srpm in %s. Got %s" % (sdir, srpm_paths) + + log.debug("Wrote srpm into %s" % srpm_paths[0]) + return srpm_paths[0] + + @staticmethod + def get_session_from_config(config): + koji_config = munch.Munch(koji.read_config( + profile_name=config.koji_profile, + user_config=config.koji_config, + )) + koji_module = koji.get_profile_module( + config.koji_profile, + config=koji_config, + ) + + krbservice = getattr(koji_config, "krbservice", None) + if krbservice: + koji_config.krbservice = krbservice + + address = koji_config.server + log.info("Connecting to koji %r" % address) + koji_session = koji.ClientSession(address, opts=vars(koji_config)) + + authtype = koji_config.authtype + if authtype == "kerberos": + keytab = getattr(koji_config, "keytab", None) + principal = getattr(koji_config, "principal", None) + if keytab and principal: + koji_session.krb_login( + principal=principal, + keytab=keytab, + proxyuser=None, + ) + else: + koji_session.krb_login() + elif authtype == "ssl": + koji_session.ssl_login( + os.path.expanduser(koji_config.cert), + None, + os.path.expanduser(koji_config.serverca), + proxyuser=None, + ) + else: + raise ValueError("Unrecognized koji authtype %r" % authtype) + return (koji_session, koji_module) def buildroot_resume(self): # XXX: experimental """ Resume existing buildroot. Sets __prep=True """ - chktag = self.koji_session.getTag(self._get_module_tag_name()) + chktag = self.koji_session.getTag(self.tag_name) if not chktag: - raise SystemError("Tag %s doesn't exist" % self._get_module_tag_name()) - chkbuildtag = self.koji_session.getTag(self._get_module_build_tag_name()) + raise SystemError("Tag %s doesn't exist" % self.tag_name) + chkbuildtag = self.koji_session.getTag(self.tag_name + "-build") if not chkbuildtag: - raise SystemError("Build Tag %s doesn't exist" % self._get_module_build_tag_name()) - chktarget = self.koji_session.getBuildTarget(self._get_module_target_name()) + raise SystemError("Build Tag %s doesn't exist" % self.tag_name + "-build") + chktarget = self.koji_session.getBuildTarget(self.tag_name) if not chktarget: - raise SystemError("Target %s doesn't exist" % self._get_module_target_name()) + raise SystemError("Target %s doesn't exist" % self.tag_name) self.module_tag = chktag self.module_build_tag = chkbuildtag self.module_target = chktarget self.__prep = True + log.info("%r buildroot resumed." % self) def buildroot_prep(self): """ :param module_deps_tags: a tag names of our build requires :param module_deps_tags: a tag names of our build requires """ - self.module_tag = self._koji_create_tag(self._get_module_tag_name(), perm="admin") # returns tag obj - self.module_build_tag = self._koji_create_tag(self._get_module_build_tag_name(), self.arches, perm="admin") + self.module_tag = self._koji_create_tag( + self.tag_name, perm="admin") # returns tag obj + self.module_build_tag = self._koji_create_tag( + self.tag_name + "-build", self.arches, perm="admin") groups = KOJI_DEFAULT_GROUPS # TODO: read from config if groups: - self._koji_add_groups_to_tag(self.module_build_tag, groups) + retry(self._koji_add_groups_to_tag, dest_tag=self.module_build_tag, groups=groups) - self.module_target = self._koji_add_target(self._get_module_target_name(), self.module_build_tag, self.module_tag) + self.module_target = self._koji_add_target(self.tag_name, self.module_build_tag, self.module_tag) self.__prep = True + log.info("%r buildroot prepared." % self) def buildroot_add_dependency(self, dependencies): tags = [self._get_tag(d)['name'] for d in dependencies] + log.info("%r adding deps for %r" % (self, tags)) self._koji_add_many_tag_inheritance(self.module_build_tag, tags) - def buildroot_add_artifacts(self, artifacts): + def buildroot_add_artifacts(self, artifacts, install=False): + """ + :param artifacts - list of artifacts to add to buildroot + :param install=False - force install artifact (if it's not dragged in as dependency) + """ # TODO: import /usr/bin/koji's TaskWatcher() + log.info("%r adding artifacts %r" % (self, artifacts)) + dest_tag = self._get_tag(self.module_build_tag)['id'] for nvr in artifacts: - self.koji_session.tagBuild(self.module_build_tag, nvr, force=True) + self.koji_session.tagBuild(dest_tag, nvr, force=True) + if install: + for group in ('srpm-build', 'build'): + pkg_info = kobo.rpmlib.parse_nvr(nvr) + log.info("%r adding %s to group %s" % (self, pkg_info['name'], group)) + self.koji_session.groupPackageListAdd(dest_tag, group, pkg_info['name']) - def buildroot_ready(self, artifact=None): - # XXX: steal code from /usr/bin/koji - cmd = "koji -p %s wait-repo %s " % (self._koji_profile_name, self.module_build_tag['name']) - if artifact: - cmd += " --build %s" % artifact - print ("Waiting for buildroot(%s) to be ready" % (self.module_build_tag['name'])) - run(cmd) # wait till repo is current + + def wait_task(self, task_id): + """ + :param task_id + :return - task result object + """ + start = time.time() + timeout = 60 # minutes + + log.info("Waiting for task_id=%s to finish" % task_id) + while True: + if (time.time() - start) >= (timeout * 60.0): + break + try: + log.debug("Waiting for task_id=%s to finish" % task_id) + return self.koji_session.getTaskResult(task_id) + + except koji.GenericError: + time.sleep(30) + log.info("Done waiting for task_id=%s to finish" % task_id) + return 1 def build(self, artifact_name, source): """ :param source : scmurl to spec repository - :return koji taskid + : param artifact_name: name of artifact (which we couldn't get from spec due involved macros) + :return koji build task id """ + # Taken from /usr/bin/koji + def _unique_path(prefix): + """Create a unique path fragment by appending a path component + to prefix. The path component will consist of a string of letter and numbers + that is unlikely to be a duplicate, but is not guaranteed to be unique.""" + # Use time() in the dirname to provide a little more information when + # browsing the filesystem. + # For some reason repr(time.time()) includes 4 or 5 + # more digits of precision than str(time.time()) + return '%s/%r.%s' % (prefix, time.time(), + ''.join([random.choice(string.ascii_letters) for i in range(8)])) + if not self.__prep: raise RuntimeError("Buildroot is not prep-ed") - if '://' not in source: - raise NotImplementedError("Only scm url is currently supported, got source='%s'" % source) self._koji_whitelist_packages([artifact_name,]) + if '://' not in source: + #treat source as an srpm and upload it + serverdir = _unique_path('cli-build') + callback =None + self.koji_session.uploadWrapper(source, serverdir, callback=callback) + source = "%s/%s" % (serverdir, os.path.basename(source)) + task_id = self.koji_session.build(source, self.module_target['name']) - print("Building %s (taskid=%s)." % (source, task_id)) + log.info("submitted build of %s (task_id=%s), via %s" % ( + source, task_id, self)) return task_id def _get_tag(self, tag, strict=True): @@ -276,23 +478,30 @@ class KojiModuleBuilder(GenericBuilder): :param build_tag_name :param groups: A dict {'group' : [package, ...]} """ + log.debug("Adding groups=%s to tag=%s" % (groups.keys(), dest_tag)) if groups and not isinstance(groups, dict): raise ValueError("Expected dict {'group' : [str(package1), ...]") dest_tag = self._get_tag(dest_tag)['name'] - groups = dict([(p['name'], p['group_id']) for p in self.koji_session.getTagGroups(dest_tag, inherit=False)]) + existing_groups = dict([ + (p['name'], p['group_id']) + for p in self.koji_session.getTagGroups(dest_tag, inherit=False) + ]) + for group, packages in groups.iteritems(): - group_id = groups.get(group, None) + group_id = existing_groups.get(group, None) if group_id is not None: - print("Group %s already exists for tag %s" % (group, dest_tag)) - return 1 + log.warning("Group %s already exists for tag %s" % (group, dest_tag)) + continue self.koji_session.groupListAdd(dest_tag, group) + log.debug("Adding %d packages into group=%s tag=%s" % (len(packages), group, dest_tag)) for pkg in packages: self.koji_session.groupPackageListAdd(dest_tag, group, pkg) def _koji_create_tag(self, tag_name, arches=None, fail_if_exists=True, perm=None): + log.debug("Creating tag %s" % tag_name) chktag = self.koji_session.getTag(tag_name) if chktag and fail_if_exists: raise SystemError("Tag %s already exist" % tag_name) @@ -312,15 +521,6 @@ class KojiModuleBuilder(GenericBuilder): self._lock_tag(tag_name, perm) return self._get_tag(tag_name) - def _get_module_target_name(self): - return self.module_str - - def _get_module_tag_name(self): - return self.module_str - - def _get_module_build_tag_name(self): - return "%s-build" % self._get_module_tag_name() - def _get_component_owner(self, package): user = self.koji_session.getLoggedInUser()['name'] return user @@ -332,7 +532,7 @@ class KojiModuleBuilder(GenericBuilder): for package in packages: package_id = pkglist.get(package, None) if not package_id is None: - print ("Package %s already exists in tag %s" % (package, self.module_tag['name'])) + log.warn("Package %s already exists in tag %s" % (package, self.module_tag['name'])) continue to_add.append(package) diff --git a/rida/config.py b/rida/config.py index d643a744..bc28c41b 100644 --- a/rida/config.py +++ b/rida/config.py @@ -64,11 +64,13 @@ def from_file(filename=None): conf.db = default.get("db") conf.system = default.get("system") conf.messaging = default.get("messaging") + conf.polling_interval = int(default.get("polling_interval")) conf.pdc_url = default.get("pdc_url") conf.pdc_insecure = default.get("pdc_insecure") conf.pdc_develop = default.get("pdc_develop") conf.koji_config = default.get("koji_config") conf.koji_profile = default.get("koji_profile") + conf.koji_arches = json.loads(default.get("koji_arches")) conf.scmurls = json.loads(default.get("scmurls")) conf.rpms_default_repository = default.get("rpms_default_repository") conf.rpms_allow_repository = asbool(default.get("rpms_allow_repository")) @@ -329,4 +331,3 @@ class Config(object): def log_level(self, s): level = str(s).lower() self._log_level = logger.str_to_log_level(level) - diff --git a/rida/database.py b/rida/database.py index 8af9efea..3e16782a 100644 --- a/rida/database.py +++ b/rida/database.py @@ -38,6 +38,13 @@ from sqlalchemy.orm import ( ) from sqlalchemy.ext.declarative import declarative_base +import modulemd as _modulemd + +import rida.messaging + +import logging +log = logging.getLogger(__name__) + # Just like koji.BUILD_STATES, except our own codes for modules. BUILD_STATES = { @@ -65,6 +72,8 @@ BUILD_STATES = { "ready": 5, } +INVERSE_BUILD_STATES = {v: k for k, v in BUILD_STATES.items()} + class RidaBase(object): # TODO -- we can implement functionality here common to all our model @@ -126,9 +135,18 @@ class ModuleBuild(Base): release = Column(String, nullable=False) state = Column(Integer, nullable=False) modulemd = Column(String, nullable=False) + koji_tag = Column(String) # This gets set after 'wait' module = relationship('Module', backref='module_builds', lazy=False) + def mmd(self): + mmd = _modulemd.ModuleMetadata() + try: + mmd.loads(self.modulemd) + except: + raise ValueError("Invalid modulemd") + return mmd + @validates('state') def validate_state(self, key, field): if field in BUILD_STATES.values(): @@ -138,10 +156,64 @@ class ModuleBuild(Base): raise ValueError("%s: %s, not in %r" % (key, field, BUILD_STATES)) @classmethod - def from_fedmsg(cls, session, msg): - if '.module.' not in msg['topic']: - raise ValueError("%r is not a module message." % msg['topic']) - return session.query(cls).filter(cls.id==msg['msg']['id']).one() + def from_module_event(cls, session, event): + if '.module.' not in event['topic']: + raise ValueError("%r is not a module message." % event['topic']) + return session.query(cls).filter(cls.id==event['msg']['id']).first() + + @classmethod + def create(cls, session, conf, name, version, release, modulemd): + module = cls( + name=name, + version=version, + release=release, + state="init", + modulemd=modulemd, + ) + session.add(module) + session.commit() + rida.messaging.publish( + modname='rida', + topic='module.state.change', + msg=module.json(), # Note the state is "init" here... + backend=conf.messaging, + ) + return module + + def transition(self, conf, state): + """ Record that a build has transitioned state. """ + old_state = self.state + self.state = state + log.debug("%r, state %r->%r" % (self, old_state, self.state)) + rida.messaging.publish( + modname='rida', + topic='module.state.change', + msg=self.json(), # Note the state is "init" here... + backend=conf.messaging, + ) + + @classmethod + def by_state(cls, session, state): + return session.query(rida.database.ModuleBuild)\ + .filter_by(state=BUILD_STATES[state]).all() + + @classmethod + def from_repo_done_event(cls, session, event): + """ Find the ModuleBuilds in our database that should be in-flight... + ... for a given koji tag. + + There should be at most one. + """ + tag = event['msg']['tag'].strip('-build') + query = session.query(cls)\ + .filter(cls.koji_tag==tag)\ + .filter(cls.state==BUILD_STATES["build"]) + + count = query.count() + if count > 1: + raise RuntimeError("%r module builds in flight for %r" % (count, tag)) + + return query.first() def json(self): return { @@ -150,6 +222,7 @@ class ModuleBuild(Base): 'version': self.version, 'release': self.release, 'state': self.state, + 'state_name': INVERSE_BUILD_STATES[self.state], # This is too spammy.. #'modulemd': self.modulemd, @@ -158,26 +231,54 @@ class ModuleBuild(Base): 'component_builds': [build.id for build in self.component_builds], } + def __repr__(self): + return "" % ( + self.name, self.version, self.release, + INVERSE_BUILD_STATES[self.state]) + class ComponentBuild(Base): __tablename__ = "component_builds" id = Column(Integer, primary_key=True) package = Column(String, nullable=False) + scmurl = Column(String, nullable=False) # XXX: Consider making this a proper ENUM format = Column(String, nullable=False) - task = Column(Integer) + task_id = Column(Integer) # This is the id of the build in koji # XXX: Consider making this a proper ENUM (or an int) - state = Column(String) + state = Column(Integer) module_id = Column(Integer, ForeignKey('module_builds.id'), nullable=False) module_build = relationship('ModuleBuild', backref='component_builds', lazy=False) + @classmethod + def from_component_event(cls, session, event): + if 'component.state.change' not in event['topic'] and '.buildsys.build.state.change' not in event['topic']: + raise ValueError("%r is not a koji message." % event['topic']) + return session.query(cls).filter(cls.task_id==event['msg']['task_id']).first() + def json(self): - return { + retval = { 'id': self.id, 'package': self.package, 'format': self.format, - 'task': self.task, + 'task_id': self.task_id, 'state': self.state, - 'module_build': self.module_build.id, + 'module_build': self.module_id, } + + try: + # Koji is py2 only, so this fails if the main web process is + # running on py3. + import koji + retval['state_name'] = koji.BUILD_STATES.get(self.state) + except ImportError: + pass + + return retval + + + + def __repr__(self): + return "" % ( + self.package, self.module_id, self.state, self.task_id) diff --git a/rida/logger.py b/rida/logger.py index b1596672..eb48cf8f 100644 --- a/rida/logger.py +++ b/rida/logger.py @@ -38,7 +38,7 @@ import logging logging.debug("Phasers are set to stun.") logging.info("%s tried to build something", username) -logging.warn("%s failed to build", build_id) +logging.warn("%s failed to build", task_id) """ diff --git a/rida/pdc.py b/rida/pdc.py index 6d85e0e8..f15d0331 100644 --- a/rida/pdc.py +++ b/rida/pdc.py @@ -68,7 +68,7 @@ def get_variant_dict(data): return isinstance(data, modulemd.ModuleMetadata) def is_module_str(data): - return isinstance(data, str) + return isinstance(data, str) or isinstance(data, unicode) result = None @@ -88,7 +88,7 @@ def get_variant_dict(data): result['variant_release'] = '0' elif is_module_dict(data): - result = {'variant_name': data['name'], 'variant_version': data['version']} + result = {'variant_name': data['name'], 'variant_version': data['version'], 'variant_release': data['release']} if not result: raise ValueError("Couldn't get variant_dict from %s" % data) @@ -96,7 +96,6 @@ def get_variant_dict(data): return result - def variant_dict_from_str(module_str): """ :param module_str: a string to match in PDC @@ -108,38 +107,58 @@ def variant_dict_from_str(module_str): module_info = {} - module_info['variant_name'] = module_str[:module_str.find('-')] - module_info['variant_version'] = module_str[module_str.find('-')+1:] + + release_start = module_str.rfind('-') + version_start = module_str.rfind('-', 0, release_start) + module_info['variant_release'] = module_str[release_start+1:] + module_info['variant_version'] = module_str[version_start+1:release_start] + module_info['variant_name'] = module_str[:version_start] module_info['variant_type'] = 'module' return module_info -def get_module(session, module_info): +def get_module(session, module_info, strict=False): """ :param session : PDCClient instance :param module_info: pdc variant_dict, str, mmd or module dict + :param strict: Normally this function returns None if no module can be + found. If strict=True, then a ValueError is raised. + :return final list of module_info which pass repoclosure """ module_info = get_variant_dict(module_info) + retval = session['unreleasedvariants'](page_size=-1, + variant_name=module_info['variant_name'], + variant_version=module_info['variant_version'], + variant_release=module_info['variant_release']) + assert len(retval) <= 1 - module_info = session['unreleasedvariants'](page_size=-1, **module_info) - assert len(module_info) <= 1 + # Error handling + if not retval: + if strict: + raise ValueError("Failed to find module in PDC %r" % module_info) + else: + return None - if not module_info: - return None + return retval[0] - return module_info[0] - -def get_module_tag(session, module_info): +def get_module_tag(session, module_info, strict=False): """ :param session : PDCClient instance :param module_info: list of module_info dicts + :param strict: Normally this function returns None if no module can be + found. If strict=True, then a ValueError is raised. :return: koji tag string """ - return get_module(session, module_info)['koji_tag'] + # TODO -- get this from PDC some day... for now, we're just going to + # construct the module tag name from the module attrs we already know + # about. + #return get_module(session, module_info, strict=strict)['koji_tag'] + variant_data = get_variant_dict(module_info) + return "{variant_name}-{variant_version}-{variant_release}".format(**variant_data) -def module_depsolving_wrapper(session, module_list): +def module_depsolving_wrapper(session, module_list, strict=True): """ :param session : PDCClient instance :param module_list: list of module_info dicts @@ -148,31 +167,49 @@ def module_depsolving_wrapper(session, module_list): # TODO: implement this # Make sure that these are dicts from PDC ... ensures all values - module_infos = [get_module(session, module) for module in module_list] + module_list = set([get_module_tag(session, x, strict) for x in module_list]) + seen = set() # don't query pdc for the same items all over again - return module_infos + while True: + if seen == module_list: + break -def get_module_dependencies(session, module_info): + for module in module_list: + if module in seen: + continue + info = get_module(session, module, strict) + assert info, "Module '%s' not found in PDC" % module + module_list.update([x['dependency'] for x in info['build_deps']]) + seen.add(module) + module_list.update(info['build_deps']) + + return list(module_list) + +def get_module_runtime_dependencies(session, module_info, strict=False): """ :param session : PDCClient instance :param module_infos : a dict containing filters for pdc + :param strict: Normally this function returns None if no module can be + found. If strict=True, then a ValueError is raised. Example minimal module_info {'variant_name': module_name, 'variant_version': module_version, 'variant_type': 'module'} """ # XXX get definitive list of modules deps = [] - module_info = get_module(session, module_info) - if module_info.get('runtime_deps'): + module_info = get_module(session, module_info, strict=strict) + if module_info and module_info.get('runtime_deps', None): deps = [x['dependency'] for x in module_info['runtime_deps']] - deps = module_depsolving_wrapper(session, deps) + deps = module_depsolving_wrapper(session, deps, strict=strict) return deps -def get_module_build_dependencies(session, module_info): +def get_module_build_dependencies(session, module_info, strict=False): """ :param session : PDCClient instance :param module_info : a dict containing filters for pdc + :param strict: Normally this function returns None if no module can be + found. If strict=True, then a ValueError is raised. :return final list of module_infos which pass repoclosure Example minimal module_info {'variant_name': module_name, 'variant_version': module_version, 'variant_type': 'module'} @@ -180,9 +217,9 @@ def get_module_build_dependencies(session, module_info): # XXX get definitive list of modules deps = [] - module_info = get_module(session, module_info) - if module_info.get('build_deps'): + module_info = get_module(session, module_info, strict=strict) + if module_info and module_info.get('build_deps', None): deps = [x['dependency'] for x in module_info['build_deps']] - deps = module_depsolving_wrapper(session, deps) + deps = module_depsolving_wrapper(session, deps, strict=strict) return deps diff --git a/rida/scheduler/handlers/components.py b/rida/scheduler/handlers/components.py new file mode 100644 index 00000000..d81b5224 --- /dev/null +++ b/rida/scheduler/handlers/components.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2016 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# Written by Ralph Bean + +""" Handlers for koji component build events on the message bus. """ + +import logging + +import rida.builder +import rida.database +import rida.pdc + +import koji + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger(__name__) + +def _finalize(config, session, msg, state): + """ Called whenever a koji build completes or fails. """ + + # First, find our ModuleBuild associated with this repo, if any. + component_build = rida.database.ComponentBuild.from_component_event(session, msg) + if not component_build: + template = "We have no record of {name}-{version}-{release}" + log.debug(template.format(**msg['msg'])) + return + + # Mark the state in the db. + component_build.state = state + session.commit() + + # Find all of the sibling builds of this particular build. + parent = component_build.module_build + siblings = parent.component_builds + + # Are any of them still executing? + if any([c.state == koji.BUILD_STATES['BUILDING'] for c in siblings]): + # Then they're not all done yet... continue to wait + return + + # Otherwise, check to see if any failed. + if any([c.state != koji.BUILD_STATES['COMPLETE'] for c in siblings]): + # They didn't all succeed.. so mark this module build as a failure. + parent.transition(config, rida.BUILD_STATES['failed']) + session.commit() + return + + # Otherwise.. if all of the builds succeeded, then mark the module as good. + parent.transition(config, rida.BUILD_STATES['done']) + + +def complete(config, session, msg): + return _finalize(config, session, msg, state=koji.BUILD_STATES['COMPLETE']) + +def failed(config, session, msg): + return _finalize(config, session, msg, state=koji.BUILD_STATES['FAILED']) + +def canceled(config, session, msg): + return _finalize(config, session, msg, state=koji.BUILD_STATES['CANCELED']) diff --git a/rida/scheduler/handlers/modules.py b/rida/scheduler/handlers/modules.py index b3c7bb2a..34b94642 100644 --- a/rida/scheduler/handlers/modules.py +++ b/rida/scheduler/handlers/modules.py @@ -26,62 +26,67 @@ import rida.builder import rida.database import rida.pdc -import time import logging -import koji +import os -import logging +logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(__name__) -def init(config, session, msg): - """ Called whenever a module enters the 'init' state. +def get_rpm_release_from_tag(tag): + return tag.replace("-", "_") - We usually transition to this state when the modulebuild is first requested. +def get_artifact_from_srpm(srpm_path): + return os.path.basename(srpm_path).replace(".src.rpm", "") + +def wait(config, session, msg): + """ Called whenever a module enters the 'wait' state. + + We transition to this state shortly after a modulebuild is first requested. All we do here is request preparation of the buildroot. + The kicking off of individual component builds is handled elsewhere, + in rida.schedulers.handlers.repos. """ - build = rida.database.ModuleBuild.from_fedmsg(session, msg) - pdc = rida.pdc.get_pdc_client_session(config) - # TODO do some periodical polling of variant_info since it's being created based on the same message - #log.warn("HACK: waiting 10s for pdc") - #time.sleep(10) - log.debug("Getting module from pdc with following input_data=%s" % build.json()) - module_info = pdc.get_module(build.json()) + build = rida.database.ModuleBuild.from_module_event(session, msg) + log.info("Found build=%r from message" % build) - log.debug("Received module_info=%s from pdc" % module_info) + module_info = build.json() + if module_info['state'] != msg['msg']['state']: + log.warn("Note that retrieved module state %r " + "doesn't match message module state %r" % ( + module_info['state'], msg['msg']['state'])) + # This is ok.. it's a race condition we can ignore. + pass - tag = rida.pdc.get_module_tag(pdc, module_info) - log.info("Found tag=%s for module %s-%s-%s" % (tag, build.name, build.version, build.release)) + pdc_session = rida.pdc.get_pdc_client_session(config) + tag = rida.pdc.get_module_tag(pdc_session, module_info, strict=True) + log.info("Found tag=%s for module %r" % (tag, build)) - dependencies = rida.pdc.get_module_dependencies(pdc, module_info) - builder = rida.builder.KojiModuleBuilder(build.name, config) - builder.buildroot_add_dependency(dependencies) + # Hang on to this information for later. We need to know which build is + # associated with which koji tag, so that when their repos are regenerated + # in koji we can figure out which for which module build that event is + # relevant. + log.debug("Assigning koji tag=%s to module build" % tag) + build.koji_tag = tag + + dependencies = rida.pdc.get_module_build_dependencies(pdc_session, module_info, strict=True) + builder = rida.builder.KojiModuleBuilder(build.name, config, tag_name=tag) build.buildroot_task_id = builder.buildroot_prep() - # TODO: build srpm with dist_tag macros - # TODO submit build from srpm to koji - # TODO: buildroot.add_artifact(build_with_dist_tags) - # TODO: buildroot.ready(artifact=$artifact) - build.state = "wait" # Wait for the buildroot to be ready. - log.debug("Done with init") + log.debug("Adding dependencies %s into buildroot for module %s" % (dependencies, module_info)) + builder.buildroot_add_dependency(dependencies) + # inject dist-tag into buildroot + srpm = builder.get_disttag_srpm(disttag=".%s" % get_rpm_release_from_tag(tag)) + task_id = builder.build(artifact_name="module-build-macros", source=srpm) + # TODO -- this has to go eventually.. otherwise, we can only build one + # module at a time and that just won't scale. + builder.wait_task(task_id) + # TODO -- do cleanup if this fails -def build(config, session, msg): - """ Called whenever a module enters the "build" state. + artifact = get_artifact_from_srpm(srpm) + builder.buildroot_add_artifacts([artifact,], install=True) # tag && add to srpm-build group + builder.buildroot_ready(artifacts=[artifact,]) - We usually transition to this state once the buildroot is ready. - - All we do here is kick off builds of all our components. - """ - module_build = rida.database.ModuleBuild.from_fedmsg(session, msg) - for component_build in module_build.component_builds: - scmurl = "{dist_git}/rpms/{package}?#{gitref}".format( - dist_git=config.dist_git_url, - package=component_build.package, - gitref=component_build.gitref, # This is the update stream - ) - artifact_name = 'TODO' - component_build.task = builder.build(artifact_name, scmurl) - component_build.state = koji.BUILD_STATES['BUILDING'] - - build.state = "build" # Now wait for all of those to finish. + build.transition(config, state="build") # Wait for the buildroot to be ready. + session.commit() diff --git a/rida/scheduler/handlers/repos.py b/rida/scheduler/handlers/repos.py new file mode 100644 index 00000000..2c76b947 --- /dev/null +++ b/rida/scheduler/handlers/repos.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2016 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# Written by Ralph Bean + +""" Handlers for repo change events on the message bus. """ + +import rida.builder +import rida.database +import rida.pdc +import logging +import koji + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger(__name__) + + +def done(config, session, msg): + """ Called whenever koji rebuilds a repo, any repo. """ + + # First, find our ModuleBuild associated with this repo, if any. + tag = msg['msg']['tag'].strip('-build') + module_build = rida.database.ModuleBuild.from_repo_done_event(session, msg) + if not module_build: + log.info("No module build found associated with koji tag %r" % tag) + return + + unbuilt_components = ( + component_build for component_build in module_build.component_builds + if component_build.state is None + ) + + builder = rida.builder.KojiModuleBuilder(module_build.name, config, tag_name=tag) + builder.buildroot_resume() + + for component_build in unbuilt_components: + component_build.state = koji.BUILD_STATES['BUILDING'] + log.debug("Using scmurl=%s for package=%s" % ( + component_build.scmurl, + component_build.package, + )) + log.info("Building artifact_name=%s from source=%s" % (component_build.package, component_build.scmurl)) + component_build.task_id = builder.build( + artifact_name=component_build.package, + source=component_build.scmurl, + ) + session.commit() diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py index 73208dbc..67a706a5 100644 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -32,14 +32,26 @@ proper scheduling component builds in the supported build systems. import inspect import logging +import operator import os +import pprint import threading +import time + +try: + # Py3 + import queue +except ImportError: + # Py2 + import Queue as queue + import rida.config import rida.logger import rida.messaging +import rida.scheduler.handlers.components import rida.scheduler.handlers.modules -#import rida.scheduler.handlers.builds +import rida.scheduler.handlers.repos import sys import koji @@ -56,89 +68,234 @@ else: # production config = rida.config.from_file() -# TODO: Utilized rida.builder to prepare the buildroots and build components. -# TODO: Set the build state to build once the module build is started. -# TODO: Set the build state to done once the module build is done. -# TODO: Set the build state to failed if the module build fails. + +class STOP_WORK(object): + """ A sentinel value, indicating that work should be stopped. """ + pass + def module_build_state_from_msg(msg): - state = int(msg['msg']['state']) # TODO better handling assert state in rida.BUILD_STATES.values(), "state=%s(%s) is not in %s" % (state, type(state), rida.BUILD_STATES.values()) return state -class Messaging(threading.Thread): - # These are our main lookup tables for figuring out what to run in response - # to what messaging events. - on_build_change = { - koji.BUILD_STATES["BUILDING"]: lambda x: x - } - on_module_change = { - rida.BUILD_STATES["init"]: rida.scheduler.handlers.modules.init, - } +class MessageIngest(threading.Thread): + def __init__(self, outgoing_work_queue, *args, **kwargs): + self.outgoing_work_queue = outgoing_work_queue + super(MessageIngest, self).__init__(*args, **kwargs) + + + def run(self): + for msg in rida.messaging.listen(backend=config.messaging): + self.outgoing_work_queue.put(msg) + + +class MessageWorker(threading.Thread): + + def __init__(self, incoming_work_queue, *args, **kwargs): + self.incoming_work_queue = incoming_work_queue + super(MessageWorker, self).__init__(*args, **kwargs) + + # These are our main lookup tables for figuring out what to run in response + # to what messaging events. + NO_OP = lambda config, session, msg: True + self.on_build_change = { + koji.BUILD_STATES["BUILDING"]: NO_OP, + koji.BUILD_STATES["COMPLETE"]: rida.scheduler.handlers.components.complete, + koji.BUILD_STATES["FAILED"]: rida.scheduler.handlers.components.failed, + koji.BUILD_STATES["CANCELED"]: rida.scheduler.handlers.components.canceled, + koji.BUILD_STATES["DELETED"]: NO_OP, + } + self.on_module_change = { + rida.BUILD_STATES["init"]: NO_OP, + rida.BUILD_STATES["wait"]: rida.scheduler.handlers.modules.wait, + rida.BUILD_STATES["build"]: NO_OP, + rida.BUILD_STATES["failed"]: NO_OP, + rida.BUILD_STATES["done"]: NO_OP, + rida.BUILD_STATES["ready"]: NO_OP, + } + # Only one kind of repo change event, though... + self.on_repo_change = rida.scheduler.handlers.repos.done def sanity_check(self): """ On startup, make sure our implementation is sane. """ # Ensure we have every state covered for state in rida.BUILD_STATES: - if state not in self.on_module_change: + if rida.BUILD_STATES[state] not in self.on_module_change: raise KeyError("Module build states %r not handled." % state) for state in koji.BUILD_STATES: - if state not in self.on_build_change: + if koji.BUILD_STATES[state] not in self.on_build_change: raise KeyError("Koji build states %r not handled." % state) all_fns = self.on_build_change.items() + self.on_module_change.items() for key, callback in all_fns: - expected = ['conf', 'db', 'msg'] - argspec = inspect.getargspec(callback) + expected = ['config', 'session', 'msg'] + argspec = inspect.getargspec(callback)[0] if argspec != expected: raise ValueError("Callback %r, state %r has argspec %r!=%r" % ( callback, key, argspec, expected)) def run(self): - #self.sanity_check() - # TODO: Check for modules that can be set to done/failed - # TODO: Act on these things somehow - # TODO: Emit messages about doing so - for msg in rida.messaging.listen(backend=config.messaging): - log.debug("Saw %r, %r" % (msg['msg_id'], msg['topic'])) - log.debug(msg) + self.sanity_check() - # Choose a handler for this message - if '.buildsys.build.state.change' in msg['topic']: - handler = self.on_build_change[msg['msg']['init']] - elif '.rida.module.state.change' in msg['topic']: - handler = self.on_module_change[module_build_state_from_msg(msg)] - else: - log.debug("Unhandled message...") - continue + while True: + msg = self.incoming_work_queue.get() - # Execute our chosen handler - with rida.database.Database(config) as session: - handler(config, session, msg) + if msg is STOP_WORK: + log.info("Worker thread received STOP_WORK, shutting down...") + break + + try: + self.process_message(msg) + except Exception: + log.exception("Failed while handling %r" % msg['msg_id']) + # Log the body of the message too, but clear out some spammy + # fields that are of no use to a human reader. + msg.pop('certificate', None) + msg.pop('signature', None) + log.info(pprint.pformat(msg)) + + def process_message(self, msg): + log.debug("received %r, %r" % (msg['msg_id'], msg['topic'])) + + # Choose a handler for this message + if '.buildsys.repo.done' in msg['topic']: + handler = self.on_repo_change + elif '.buildsys.build.state.change' in msg['topic']: + handler = self.on_build_change[msg['msg']['new']] + elif '.rida.module.state.change' in msg['topic']: + handler = self.on_module_change[module_build_state_from_msg(msg)] + else: + log.debug("Unhandled message...") + return + + # Execute our chosen handler + with rida.database.Database(config) as session: + log.info(" %s: %s, %s" % (handler.__name__, msg['topic'], msg['msg_id'])) + handler(config, session, msg) + + +class Poller(threading.Thread): + def __init__(self, outgoing_work_queue, *args, **kwargs): + self.outgoing_work_queue = outgoing_work_queue + super(Poller, self).__init__(*args, **kwargs) -class Polling(threading.Thread): def run(self): while True: - # TODO: Check for module builds in the wait state - # TODO: Check component builds in the open state - # TODO: Check for modules that can be set to done/failed - # TODO: Act on these things somehow - # TODO: Emit messages about doing so - # TODO: Sleep for a configuration-determined interval - pass + with rida.database.Database(config) as session: + self.log_summary(session) + # XXX: detect whether it's really stucked first + #with rida.database.Database(config) as session: + # self.process_waiting_module_builds(session) + with rida.database.Database(config) as session: + self.process_open_component_builds(session) + with rida.database.Database(config) as session: + self.process_lingering_module_builds(session) + with rida.database.Database(config) as session: + self.fail_lost_builds(session) + + log.info("Polling thread sleeping, %rs" % config.polling_interval) + time.sleep(config.polling_interval) + + def fail_lost_builds(self, session): + # This function is supposed to be handling only + # the part which can't be updated trough messaging (srpm-build failures). + # Please keep it fit `n` slim. We do want rest to be processed elsewhere + + # TODO re-use + + if config.system == "koji": + koji_session, _ = rida.builder.KojiModuleBuilder.get_session_from_config(config) + state = koji.BUILD_STATES['BUILDING'] # Check tasks that we track as BUILDING + log.info("Querying tasks for statuses:") + query = session.query(rida.database.ComponentBuild) + res = query.filter(state==koji.BUILD_STATES['BUILDING']).all() + + log.info("Checking status for %d tasks." % len(res)) + for component_build in res: + log.debug(component_build.json()) + if not component_build.task_id: # Don't check tasks which has not been triggered yet + continue + + log.info("Checking status of task_id=%s" % component_build.task_id) + task_info = koji_session.getTaskInfo(component_build.task_id) + + dead_states = ( + koji.TASK_STATES['CANCELED'], + koji.TASK_STATES['FAILED'], + ) + log.info(" task %r is in state %r" % (component_build.task_id, task_info['state'])) + if task_info['state'] in dead_states: + # Fake a fedmsg message on our internal queue + self.outgoing_work_queue.put({ + 'msg_id': 'a faked internal message', + 'topic': 'org.fedoraproject.prod.buildsys.build.state.change', + 'msg': { + 'msg_id': 'a faked internal message', + 'task_id': component_build.task_id, + 'new': koji.BUILD_STATES['FAILED'], + }, + }) + + else: + raise NotImplementedError("Buildsystem %r is not supported." % config.system) + + def log_summary(self, session): + log.info("Current status:") + backlog = self.outgoing_work_queue.qsize() + log.info(" * internal queue backlog is %i." % backlog) + states = sorted(rida.BUILD_STATES.items(), key=operator.itemgetter(1)) + for name, code in states: + query = session.query(rida.database.ModuleBuild) + count = query.filter_by(state=code).count() + if count: + log.info(" * %i module builds in the %s state." % (count, name)) + if name == 'build': + for module_build in query.all(): + log.info(" * %r" % module_build) + for component_build in module_build.component_builds: + log.info(" * %r" % component_build) + + + def process_waiting_module_builds(self, session): + log.info("Looking for module builds stuck in the wait state.") + builds = rida.database.ModuleBuild.by_state(session, "wait") + # TODO -- do throttling calculation here... + log.info(" %r module builds in the wait state..." % len(builds)) + for build in builds: + # Fake a message to kickstart the build anew + msg = { + 'topic': '.module.build.state.change', + 'msg': build.json(), + } + rida.scheduler.handlers.modules.wait(config, session, msg) + + def process_open_component_builds(self, session): + log.warning("process_open_component_builds is not yet implemented...") + + def process_lingering_module_builds(self, session): + log.warning("process_lingering_module_builds is not yet implemented...") def main(): rida.logger.init_logging(config) log.info("Starting ridad.") try: - messaging_thread = Messaging() - polling_thread = Polling() + work_queue = queue.Queue() + + # This ingest thread puts work on the queue + messaging_thread = MessageIngest(work_queue) + # This poller does other work, but also sometimes puts work in queue. + polling_thread = Poller(work_queue) + # This worker takes work off the queue and handles it. + worker_thread = MessageWorker(work_queue) + messaging_thread.start() polling_thread.start() + worker_thread.start() + except KeyboardInterrupt: # FIXME: Make this less brutal os._exit() diff --git a/submit-build.json b/submit-build.json index 7873ee98..96098098 100644 --- a/submit-build.json +++ b/submit-build.json @@ -1,3 +1,3 @@ { - "scmurl": "git://pkgs.stg.fedoraproject.org/modules/testmodule.git?#020ea37251df5019fde9e7899d2f7d7a987dfbf5" + "scmurl": "git://pkgs.stg.fedoraproject.org/modules/testmodule.git?#48932b90de214d9d13feefbd35246a81b6cb8d49" } diff --git a/test-pdc.py b/test-pdc.py index c5156377..b4f66bc2 100755 --- a/test-pdc.py +++ b/test-pdc.py @@ -6,17 +6,17 @@ from rida.config import Config cfg = Config() -cfg.pdc_url = "http://localhost:8000/rest_api/v1" +cfg.pdc_url = "http://modularity.fedorainfracloud.org:8080/rest_api/v1" cfg.pdc_insecure = True cfg.pdc_develop = True pdc_session = get_pdc_client_session(cfg) -module = get_module(pdc_session, {'name': 'testmodule', 'version': '4.3.42', 'release': '0'}) +module = get_module(pdc_session, {'name': 'testmodule', 'version': '4.3.43', 'release': '1'}) if module: print ("pdc_data=%s" % str(module)) - print ("deps=[%s]" % ", ".join(get_module_dependencies(pdc_session, module))) - print ("build_deps=[%s]" % ", ".join(get_module_build_dependencies(pdc_session, module))) + print ("deps=%s" % get_module_runtime_dependencies(pdc_session, module)) + print ("build_deps=%s" % get_module_build_dependencies(pdc_session, module)) print ("tag=%s" % get_module_tag(pdc_session, module)) else: print ('module was not found') diff --git a/tests/test_scheduler/test_modules/test_init.py b/tests/test_scheduler/test_module_wait.py similarity index 78% rename from tests/test_scheduler/test_modules/test_init.py rename to tests/test_scheduler/test_module_wait.py index 9ca15c7b..dbf38afc 100644 --- a/tests/test_scheduler/test_modules/test_init.py +++ b/tests/test_scheduler/test_module_wait.py @@ -26,35 +26,34 @@ import mock import rida.scheduler.handlers.modules -class TestInit(unittest.TestCase): +class TestModuleWait(unittest.TestCase): def setUp(self): self.config = mock.Mock() self.session = mock.Mock() - self.fn = rida.scheduler.handlers.modules.init + self.fn = rida.scheduler.handlers.modules.wait @mock.patch('rida.builder.KojiModuleBuilder') - @mock.patch('rida.database.ModuleBuild.from_fedmsg') + @mock.patch('rida.database.ModuleBuild.from_module_event') @mock.patch('rida.pdc') - def test_init_basic(self, pdc, from_fedmsg, KojiModuleBuilder): + def test_init_basic(self, pdc, from_module_event, KojiModuleBuilder): builder = mock.Mock() + builder.get_disttag_srpm.return_value = 'some srpm disttag' KojiModuleBuilder.return_value = builder mocked_module_build = mock.Mock() - mocked_module_build.to_pdc_module_info.return_value = { - 'name': 'foo', - 'version': 1, - } - from_fedmsg.return_value = mocked_module_build - pdc.get_module.return_value = { + mocked_module_build.json.return_value = { 'name': 'foo', 'version': 1, 'release': 1, + 'state': 'some state', } + from_module_event.return_value = mocked_module_build msg = { 'topic': 'org.fedoraproject.prod.rida.module.state.change', 'msg': { 'id': 1, + 'state': 'some state', }, } self.fn(config=self.config, session=self.session, msg=msg) diff --git a/tests/test_scheduler/test_modules/__init__.py b/tests/test_scheduler/test_modules/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_scheduler/test_repo_done.py b/tests/test_scheduler/test_repo_done.py new file mode 100644 index 00000000..3e9abb7e --- /dev/null +++ b/tests/test_scheduler/test_repo_done.py @@ -0,0 +1,74 @@ +# Copyright (c) 2016 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# Written by Ralph Bean + +import unittest +import mock + +import rida.scheduler.handlers.repos + + +class TestRepoDone(unittest.TestCase): + + def setUp(self): + self.config = mock.Mock() + self.config.rpms_default_repository = 'dist_git_url' + self.config.koji_profile = 'staging' # TODO - point at a fake test config + + + self.session = mock.Mock() + self.fn = rida.scheduler.handlers.repos.done + + @mock.patch('rida.database.ModuleBuild.from_repo_done_event') + def test_no_match(self, from_repo_done_event): + """ Test that when a repo msg hits us and we have no match, + that we do nothing gracefully. + """ + from_repo_done_event.return_value = None + msg = { + 'topic': 'org.fedoraproject.prod.buildsys.repo.done', + 'msg': {'tag': 'no matches for this...'}, + } + self.fn(config=self.config, session=self.session, msg=msg) + + @mock.patch('rida.builder.KojiModuleBuilder.get_session_from_config') + @mock.patch('rida.builder.KojiModuleBuilder.build') + @mock.patch('rida.builder.KojiModuleBuilder.buildroot_resume') + @mock.patch('rida.database.ModuleBuild.from_repo_done_event') + def test_a_single_match(self, from_repo_done_event, resume, build_fn, config): + """ Test that when a repo msg hits us and we have no match, + that we do nothing gracefully. + """ + config.return_value = mock.Mock(), "development" + component_build = mock.Mock() + component_build.package = 'foo' + component_build.scmurl = 'full_scm_url' + component_build.state = None + module_build = mock.Mock() + module_build.component_builds = [component_build] + + from_repo_done_event.return_value = module_build + msg = { + 'topic': 'org.fedoraproject.prod.buildsys.repo.done', + 'msg': {'tag': 'no matches for this...'}, + } + self.fn(config=self.config, session=self.session, msg=msg) + build_fn.assert_called_once_with(artifact_name='foo', source='full_scm_url')