Merge #100 Use a fresh db session with every backend msg.

This commit is contained in:
Matt Prahl
2016-10-19 17:06:20 +00:00
2 changed files with 36 additions and 11 deletions

View File

@@ -25,8 +25,12 @@
""" SQLAlchemy Database models for the Flask app
"""
import contextlib
from datetime import datetime
from sqlalchemy.orm import validates
from sqlalchemy import engine_from_config
from sqlalchemy.orm import validates, scoped_session, sessionmaker
import modulemd as _modulemd
from rida import db, log
@@ -62,6 +66,25 @@ BUILD_STATES = {
INVERSE_BUILD_STATES = {v: k for k, v in BUILD_STATES.items()}
@contextlib.contextmanager
def make_session(conf):
# TODO - we could use ZopeTransactionExtension() here some day for
# improved safety on the backend.
engine = engine_from_config({
'sqlalchemy.url': conf.sqlalchemy_database_uri,
})
session = scoped_session(sessionmaker(bind=engine))()
try:
yield session
session.commit()
except:
# This is a no-op if no transaction is in progress.
session.rollback()
raise
finally:
session.close()
class RidaBase(db.Model):
# TODO -- we can implement functionality here common to all our model classes
__abstract__ = True

View File

@@ -44,7 +44,7 @@ import rida.scheduler.handlers.repos
import koji
from rida import conf, db, models, log
from rida import conf, models, log
class STOP_WORK(object):
@@ -129,12 +129,13 @@ class MessageWorker(threading.Thread):
break
try:
self.process_message(msg)
with models.make_session(conf) as session:
self.process_message(session, msg)
except Exception:
log.exception("Failed while handling %r" % msg.msg_id)
log.info(msg)
def process_message(self, msg):
def process_message(self, session, msg):
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
.format(msg.msg_id, type(msg).__name__))
@@ -156,7 +157,7 @@ class MessageWorker(threading.Thread):
log.debug("Handler is NO_OP: %s" % idx)
else:
log.info("Calling %s" % idx)
handler(conf, db.session, msg)
handler(conf, session, msg)
log.info("Done with %s" % idx)
@@ -167,12 +168,13 @@ class Poller(threading.Thread):
def run(self):
while True:
self.log_summary(db.session)
# XXX: detect whether it's really stucked first
# self.process_waiting_module_builds(db.session)
self.process_open_component_builds(db.session)
self.process_lingering_module_builds(db.session)
self.fail_lost_builds(db.session)
with models.make_session(conf) as session:
self.log_summary(session)
# XXX: detect whether it's really stucked first
# self.process_waiting_module_builds(session)
self.process_open_component_builds(session)
self.process_lingering_module_builds(session)
self.fail_lost_builds(session)
log.info("Polling thread sleeping, %rs" % conf.polling_interval)
time.sleep(conf.polling_interval)