mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-04-04 03:08:21 +08:00
Use a fresh db session with every backend msg.
This fixes #93. See that issue for a description of the problem. This change will create a new session for every event handled by the backend and will force a commit and close the session at the end of each message. For bonus points, we should employ the ZopeTransactionManager extension in the future, but I'm not sure how it will play with the flask_sqlalchemy extension. Will check on that later.
This commit is contained in:
@@ -25,8 +25,12 @@
|
||||
|
||||
""" SQLAlchemy Database models for the Flask app
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import validates
|
||||
from sqlalchemy import engine_from_config
|
||||
from sqlalchemy.orm import validates, scoped_session, sessionmaker
|
||||
import modulemd as _modulemd
|
||||
|
||||
from rida import db, log
|
||||
@@ -62,6 +66,26 @@ BUILD_STATES = {
|
||||
INVERSE_BUILD_STATES = {v: k for k, v in BUILD_STATES.items()}
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def make_session(conf):
|
||||
# TODO - we could use ZopeTransactionExtension() here some day for
|
||||
# improved safety on the backend.
|
||||
log.debug("Getting db session with uri %r" % conf.sqlalchemy_database_uri)
|
||||
engine = engine_from_config({
|
||||
'sqlalchemy.url': conf.sqlalchemy_database_uri,
|
||||
})
|
||||
Session = scoped_session(sessionmaker())
|
||||
Session.configure(bind=engine)
|
||||
session = Session()
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except:
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
class RidaBase(db.Model):
|
||||
# TODO -- we can implement functionality here common to all our model classes
|
||||
__abstract__ = True
|
||||
|
||||
@@ -44,7 +44,7 @@ import rida.scheduler.handlers.repos
|
||||
|
||||
import koji
|
||||
|
||||
from rida import conf, db, models, log
|
||||
from rida import conf, models, log
|
||||
|
||||
|
||||
class STOP_WORK(object):
|
||||
@@ -129,12 +129,13 @@ class MessageWorker(threading.Thread):
|
||||
break
|
||||
|
||||
try:
|
||||
self.process_message(msg)
|
||||
with models.make_session(conf) as session:
|
||||
self.process_message(session, msg)
|
||||
except Exception:
|
||||
log.exception("Failed while handling %r" % msg.msg_id)
|
||||
log.info(msg)
|
||||
|
||||
def process_message(self, msg):
|
||||
def process_message(self, session, msg):
|
||||
log.debug('Received a message with an ID of "{0}" and of type "{1}"'
|
||||
.format(msg.msg_id, type(msg).__name__))
|
||||
|
||||
@@ -156,7 +157,7 @@ class MessageWorker(threading.Thread):
|
||||
log.debug("Handler is NO_OP: %s" % idx)
|
||||
else:
|
||||
log.info("Calling %s" % idx)
|
||||
handler(conf, db.session, msg)
|
||||
handler(conf, session, msg)
|
||||
log.info("Done with %s" % idx)
|
||||
|
||||
|
||||
@@ -167,12 +168,13 @@ class Poller(threading.Thread):
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.log_summary(db.session)
|
||||
# XXX: detect whether it's really stucked first
|
||||
# self.process_waiting_module_builds(db.session)
|
||||
self.process_open_component_builds(db.session)
|
||||
self.process_lingering_module_builds(db.session)
|
||||
self.fail_lost_builds(db.session)
|
||||
with models.make_session(conf) as session:
|
||||
self.log_summary(session)
|
||||
# XXX: detect whether it's really stucked first
|
||||
# self.process_waiting_module_builds(session)
|
||||
self.process_open_component_builds(session)
|
||||
self.process_lingering_module_builds(session)
|
||||
self.fail_lost_builds(session)
|
||||
|
||||
log.info("Polling thread sleeping, %rs" % conf.polling_interval)
|
||||
time.sleep(conf.polling_interval)
|
||||
|
||||
Reference in New Issue
Block a user