diff --git a/rida/models.py b/rida/models.py index e571a468..9ef994b2 100644 --- a/rida/models.py +++ b/rida/models.py @@ -25,8 +25,12 @@ """ SQLAlchemy Database models for the Flask app """ + +import contextlib + from datetime import datetime -from sqlalchemy.orm import validates +from sqlalchemy import engine_from_config +from sqlalchemy.orm import validates, scoped_session, sessionmaker import modulemd as _modulemd from rida import db, log @@ -62,6 +66,26 @@ BUILD_STATES = { INVERSE_BUILD_STATES = {v: k for k, v in BUILD_STATES.items()} +@contextlib.contextmanager +def make_session(conf): + # TODO - we could use ZopeTransactionExtension() here some day for + # improved safety on the backend. + log.debug("Getting db session with uri %r" % conf.sqlalchemy_database_uri) + engine = engine_from_config({ + 'sqlalchemy.url': conf.sqlalchemy_database_uri, + }) + Session = scoped_session(sessionmaker()) + Session.configure(bind=engine) + session = Session() + try: + yield session + session.commit() + except: + raise + finally: + session.close() + + class RidaBase(db.Model): # TODO -- we can implement functionality here common to all our model classes __abstract__ = True diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py index 47e6558e..3fa9c2dd 100644 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -44,7 +44,7 @@ import rida.scheduler.handlers.repos import koji -from rida import conf, db, models, log +from rida import conf, models, log class STOP_WORK(object): @@ -129,12 +129,13 @@ class MessageWorker(threading.Thread): break try: - self.process_message(msg) + with models.make_session(conf) as session: + self.process_message(session, msg) except Exception: log.exception("Failed while handling %r" % msg.msg_id) log.info(msg) - def process_message(self, msg): + def process_message(self, session, msg): log.debug('Received a message with an ID of "{0}" and of type "{1}"' .format(msg.msg_id, type(msg).__name__)) @@ -156,7 +157,7 @@ class MessageWorker(threading.Thread): log.debug("Handler is NO_OP: %s" % idx) else: log.info("Calling %s" % idx) - handler(conf, db.session, msg) + handler(conf, session, msg) log.info("Done with %s" % idx) @@ -167,12 +168,13 @@ class Poller(threading.Thread): def run(self): while True: - self.log_summary(db.session) - # XXX: detect whether it's really stucked first - # self.process_waiting_module_builds(db.session) - self.process_open_component_builds(db.session) - self.process_lingering_module_builds(db.session) - self.fail_lost_builds(db.session) + with models.make_session(conf) as session: + self.log_summary(session) + # XXX: detect whether it's really stucked first + # self.process_waiting_module_builds(session) + self.process_open_component_builds(session) + self.process_lingering_module_builds(session) + self.fail_lost_builds(session) log.info("Polling thread sleeping, %rs" % conf.polling_interval) time.sleep(conf.polling_interval)