mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-05-02 06:30:55 +08:00
Separate use of database sessions
This patch separates the use of database session in different MBS components and do not mix them together. In general, MBS components could be separated as the REST API (implemented based on Flask) and non-REST API including the backend build workflow (implemented as a fedmsg consumer on top of fedmsg-hub and running independently) and library shared by them. As a result, there are two kind of database session used in MBS, one is created and managed by Flask-SQLAlchemy, and another one is created from SQLAclhemy Session API directly. The goal of this patch is to make ensure session object is used properly in the right place. All the changes follow these rules: * REST API related code uses the session object db.session created and managed by Flask-SQLAlchemy. * Non-REST API related code uses the session object created with SQLAlchemy Session API. Function make_db_session does that. * Shared code does not created a new session object as much as possible. Instead, it accepts an argument db_session. The first two rules are applicable to tests as well. Major changes: * Switch tests back to run with a file-based SQLite database. * make_session is renamed to make_db_session and SQLAlchemy connection pool options are applied for PostgreSQL backend. * Frontend Flask related code uses db.session * Shared code by REST API and backend build workflow accepts SQLAlchemy session object as an argument. For example, resolver class is constructed with a database session, and some functions accepts an argument for database session. * Build workflow related code use session object returned from make_db_session and ensure db.session is not used. * Only tests for views use db.session, and other tests use db_session fixture to access database. * All argument name session, that is for database access, are renamed to db_session. * Functions model_tests_init_data, reuse_component_init_data and reuse_shared_userspace_init_data, which creates fixture data for tests, are converted into pytest fixtures from original function called inside setup_method or a test method. The reason of this conversion is to use fixture ``db_session`` rather than create a new one. That would also benefit the whole test suite to reduce the number of SQLAlchemy session objects. Signed-off-by: Chenxiong Qi <cqi@redhat.com>
This commit is contained in:
@@ -22,6 +22,8 @@
|
||||
# Written by Ralph Bean <rbean@redhat.com>
|
||||
# Matt Prahl <mprahl@redhat.com>
|
||||
# Jan Kaluza <jkaluza@redhat.com>
|
||||
|
||||
import threading
|
||||
import concurrent.futures
|
||||
|
||||
from module_build_service import conf, log, models
|
||||
@@ -29,12 +31,12 @@ import module_build_service.messaging
|
||||
from .reuse import get_reusable_components, reuse_component
|
||||
|
||||
|
||||
def at_concurrent_component_threshold(config, session):
|
||||
def at_concurrent_component_threshold(config, db_session):
|
||||
"""
|
||||
Determines if the number of concurrent component builds has reached
|
||||
the configured threshold
|
||||
:param config: Module Build Service configuration object
|
||||
:param session: SQLAlchemy database session
|
||||
:param db_session: SQLAlchemy database session
|
||||
:return: boolean representing if there are too many concurrent builds at
|
||||
this time
|
||||
"""
|
||||
@@ -57,7 +59,7 @@ def at_concurrent_component_threshold(config, session):
|
||||
# just internally in MBS to be handled by
|
||||
# scheduler.handlers.components.complete.
|
||||
if config.num_concurrent_builds:
|
||||
count = session.query(models.ComponentBuild).filter_by(
|
||||
count = db_session.query(models.ComponentBuild).filter_by(
|
||||
state=koji.BUILD_STATES["BUILDING"], reused_component_id=None).count()
|
||||
if config.num_concurrent_builds <= count:
|
||||
return True
|
||||
@@ -65,10 +67,17 @@ def at_concurrent_component_threshold(config, session):
|
||||
return False
|
||||
|
||||
|
||||
def start_build_component(builder, c):
|
||||
BUILD_COMPONENT_DB_SESSION_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def start_build_component(db_session, builder, c):
|
||||
"""
|
||||
Submits single component build to builder. Called in thread
|
||||
by QueueBasedThreadPool in continue_batch_build.
|
||||
|
||||
This function runs inside separate threads that share one SQLAlchemy
|
||||
session object to update a module build state once there is something wrong
|
||||
when one of its components is submitted to Koji to build.
|
||||
"""
|
||||
import koji
|
||||
|
||||
@@ -79,17 +88,21 @@ def start_build_component(builder, c):
|
||||
c.state = koji.BUILD_STATES["FAILED"]
|
||||
c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))
|
||||
log.exception(e)
|
||||
c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")
|
||||
with BUILD_COMPONENT_DB_SESSION_LOCK:
|
||||
c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")
|
||||
db_session.commit()
|
||||
return
|
||||
|
||||
if not c.task_id and c.state == koji.BUILD_STATES["BUILDING"]:
|
||||
c.state = koji.BUILD_STATES["FAILED"]
|
||||
c.state_reason = "Failed to build artifact %s: Builder did not return task ID" % (c.package)
|
||||
c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")
|
||||
with BUILD_COMPONENT_DB_SESSION_LOCK:
|
||||
c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")
|
||||
db_session.commit()
|
||||
return
|
||||
|
||||
|
||||
def continue_batch_build(config, module, session, builder, components=None):
|
||||
def continue_batch_build(config, module, db_session, builder, components=None):
|
||||
"""
|
||||
Continues building current batch. Submits next components in the batch
|
||||
until it hits concurrent builds limit.
|
||||
@@ -139,7 +152,7 @@ def continue_batch_build(config, module, session, builder, components=None):
|
||||
if c.state == koji.BUILD_STATES["COMPLETE"]:
|
||||
continue
|
||||
# Check the concurrent build threshold.
|
||||
if at_concurrent_component_threshold(config, session):
|
||||
if at_concurrent_component_threshold(config, db_session):
|
||||
log.info("Concurrent build threshold met")
|
||||
break
|
||||
|
||||
@@ -153,7 +166,8 @@ def continue_batch_build(config, module, session, builder, components=None):
|
||||
max_workers = config.num_threads_for_build_submissions
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {
|
||||
executor.submit(start_build_component, builder, c): c for c in components_to_build
|
||||
executor.submit(start_build_component, db_session, builder, c): c
|
||||
for c in components_to_build
|
||||
}
|
||||
concurrent.futures.wait(futures)
|
||||
# In case there has been an excepion generated directly in the
|
||||
@@ -162,11 +176,11 @@ def continue_batch_build(config, module, session, builder, components=None):
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
session.commit()
|
||||
db_session.commit()
|
||||
return further_work
|
||||
|
||||
|
||||
def start_next_batch_build(config, module, session, builder, components=None):
|
||||
def start_next_batch_build(config, module, db_session, builder, components=None):
|
||||
"""
|
||||
Tries to start the build of next batch. In case there are still unbuilt
|
||||
components in a batch, tries to submit more components until it hits
|
||||
@@ -211,7 +225,7 @@ def start_next_batch_build(config, module, session, builder, components=None):
|
||||
# the new one. If there is, continue building current batch.
|
||||
if has_unbuilt_components_in_batch:
|
||||
log.info("Continuing building batch %d", module.batch)
|
||||
return continue_batch_build(config, module, session, builder, components)
|
||||
return continue_batch_build(config, module, db_session, builder, components)
|
||||
|
||||
# Check that there are no components in BUILDING state in current batch.
|
||||
# If there are, wait until they are built.
|
||||
@@ -239,12 +253,13 @@ def start_next_batch_build(config, module, session, builder, components=None):
|
||||
", ".join([str(t["id"]) for t in active_tasks])
|
||||
)
|
||||
module.transition(
|
||||
db_session,
|
||||
config,
|
||||
state=models.BUILD_STATES["failed"],
|
||||
state_reason=state_reason,
|
||||
failure_type="infra",
|
||||
)
|
||||
session.commit()
|
||||
db_session.commit()
|
||||
return []
|
||||
|
||||
else:
|
||||
@@ -280,7 +295,7 @@ def start_next_batch_build(config, module, session, builder, components=None):
|
||||
# the new one. This can happen when resubmitting the failed module build.
|
||||
if not unbuilt_components and not components:
|
||||
log.info("Skipping build of batch %d, no component to build.", module.batch)
|
||||
return start_next_batch_build(config, module, session, builder)
|
||||
return start_next_batch_build(config, module, db_session, builder)
|
||||
|
||||
log.info("Starting build of next batch %d, %s" % (module.batch, unbuilt_components))
|
||||
|
||||
@@ -297,7 +312,7 @@ def start_next_batch_build(config, module, session, builder, components=None):
|
||||
should_try_reuse = all_reused_in_prev_batch or prev_batch == 1
|
||||
if should_try_reuse:
|
||||
component_names = [c.package for c in unbuilt_components]
|
||||
reusable_components = get_reusable_components(session, module, component_names)
|
||||
reusable_components = get_reusable_components(db_session, module, component_names)
|
||||
for c, reusable_c in zip(unbuilt_components, reusable_components):
|
||||
if reusable_c:
|
||||
components_reused = True
|
||||
@@ -306,7 +321,7 @@ def start_next_batch_build(config, module, session, builder, components=None):
|
||||
unbuilt_components_after_reuse.append(c)
|
||||
# Commit the changes done by reuse_component
|
||||
if components_reused:
|
||||
session.commit()
|
||||
db_session.commit()
|
||||
|
||||
# If all the components were reused in the batch then make a KojiRepoChange
|
||||
# message and return
|
||||
@@ -318,4 +333,4 @@ def start_next_batch_build(config, module, session, builder, components=None):
|
||||
return further_work
|
||||
|
||||
return further_work + continue_batch_build(
|
||||
config, module, session, builder, unbuilt_components_after_reuse)
|
||||
config, module, db_session, builder, unbuilt_components_after_reuse)
|
||||
|
||||
Reference in New Issue
Block a user