From e35f7af7a0ccbc2b7dd1b1177a5b32506aa52868 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Tue, 18 Oct 2016 15:11:57 -0400 Subject: [PATCH] Use a fresh db session with every backend msg. This fixes #93. See that issue for a description of the problem. This change will create a new session for every event handled by the backend and will force a commit and close the session at the end of each message. For bonus points, we should employ the ZopeTransactionManager extension in the future, but I'm not sure how it will play with the flask_sqlalchemy extension. Will check on that later. --- rida/models.py | 26 +++++++++++++++++++++++++- rida/scheduler/main.py | 22 ++++++++++++---------- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/rida/models.py b/rida/models.py index e571a468..9ef994b2 100644 --- a/rida/models.py +++ b/rida/models.py @@ -25,8 +25,12 @@ """ SQLAlchemy Database models for the Flask app """ + +import contextlib + from datetime import datetime -from sqlalchemy.orm import validates +from sqlalchemy import engine_from_config +from sqlalchemy.orm import validates, scoped_session, sessionmaker import modulemd as _modulemd from rida import db, log @@ -62,6 +66,26 @@ BUILD_STATES = { INVERSE_BUILD_STATES = {v: k for k, v in BUILD_STATES.items()} +@contextlib.contextmanager +def make_session(conf): + # TODO - we could use ZopeTransactionExtension() here some day for + # improved safety on the backend. + log.debug("Getting db session with uri %r" % conf.sqlalchemy_database_uri) + engine = engine_from_config({ + 'sqlalchemy.url': conf.sqlalchemy_database_uri, + }) + Session = scoped_session(sessionmaker()) + Session.configure(bind=engine) + session = Session() + try: + yield session + session.commit() + except: + raise + finally: + session.close() + + class RidaBase(db.Model): # TODO -- we can implement functionality here common to all our model classes __abstract__ = True diff --git a/rida/scheduler/main.py b/rida/scheduler/main.py index 47e6558e..3fa9c2dd 100644 --- a/rida/scheduler/main.py +++ b/rida/scheduler/main.py @@ -44,7 +44,7 @@ import rida.scheduler.handlers.repos import koji -from rida import conf, db, models, log +from rida import conf, models, log class STOP_WORK(object): @@ -129,12 +129,13 @@ class MessageWorker(threading.Thread): break try: - self.process_message(msg) + with models.make_session(conf) as session: + self.process_message(session, msg) except Exception: log.exception("Failed while handling %r" % msg.msg_id) log.info(msg) - def process_message(self, msg): + def process_message(self, session, msg): log.debug('Received a message with an ID of "{0}" and of type "{1}"' .format(msg.msg_id, type(msg).__name__)) @@ -156,7 +157,7 @@ class MessageWorker(threading.Thread): log.debug("Handler is NO_OP: %s" % idx) else: log.info("Calling %s" % idx) - handler(conf, db.session, msg) + handler(conf, session, msg) log.info("Done with %s" % idx) @@ -167,12 +168,13 @@ class Poller(threading.Thread): def run(self): while True: - self.log_summary(db.session) - # XXX: detect whether it's really stucked first - # self.process_waiting_module_builds(db.session) - self.process_open_component_builds(db.session) - self.process_lingering_module_builds(db.session) - self.fail_lost_builds(db.session) + with models.make_session(conf) as session: + self.log_summary(session) + # XXX: detect whether it's really stucked first + # self.process_waiting_module_builds(session) + self.process_open_component_builds(session) + self.process_lingering_module_builds(session) + self.fail_lost_builds(session) log.info("Polling thread sleeping, %rs" % conf.polling_interval) time.sleep(conf.polling_interval)