mirror of
https://pagure.io/fm-orchestrator.git
synced 2026-02-02 20:59:06 +08:00
Merge #1672 Don't pass SQLAlchemy objects between threads
This commit is contained in:
@@ -1107,6 +1107,10 @@ class ComponentBuild(MBSBase):
|
|||||||
Index("idx_component_builds_build_id_nvr", "module_id", "nvr", unique=True),
|
Index("idx_component_builds_build_id_nvr", "module_id", "nvr", unique=True),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_id(cls, db_session, component_build_id):
|
||||||
|
return db_session.query(cls).filter(cls.id == component_build_id).first()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_component_event(cls, db_session, task_id, module_id=None):
|
def from_component_event(cls, db_session, task_id, module_id=None):
|
||||||
_filter = db_session.query(cls).filter
|
_filter = db_session.query(cls).filter
|
||||||
|
|||||||
@@ -2,7 +2,6 @@
|
|||||||
# SPDX-License-Identifier: MIT
|
# SPDX-License-Identifier: MIT
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import threading
|
|
||||||
|
|
||||||
from module_build_service.common import conf, log, models
|
from module_build_service.common import conf, log, models
|
||||||
from module_build_service.scheduler import events
|
from module_build_service.scheduler import events
|
||||||
@@ -45,20 +44,16 @@ def at_concurrent_component_threshold(config):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
BUILD_COMPONENT_DB_SESSION_LOCK = threading.Lock()
|
def start_build_component(db_session, builder, component_build_id):
|
||||||
|
|
||||||
|
|
||||||
def start_build_component(db_session, builder, c):
|
|
||||||
"""
|
"""
|
||||||
Submits single component build to builder. Called in thread
|
Submits single component build to builder. Called in thread
|
||||||
by QueueBasedThreadPool in continue_batch_build.
|
by QueueBasedThreadPool in continue_batch_build.
|
||||||
|
|
||||||
This function runs inside separate threads that share one SQLAlchemy
|
|
||||||
session object to update a module build state once there is something wrong
|
|
||||||
when one of its components is submitted to Koji to build.
|
|
||||||
"""
|
"""
|
||||||
import koji
|
import koji
|
||||||
|
|
||||||
|
# Get an object valid for this thread
|
||||||
|
c = models.ComponentBuild.from_id(db_session, component_build_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
c.task_id, c.state, c.state_reason, c.nvr = builder.build(
|
c.task_id, c.state, c.state_reason, c.nvr = builder.build(
|
||||||
artifact_name=c.package, source=c.scmurl)
|
artifact_name=c.package, source=c.scmurl)
|
||||||
@@ -66,18 +61,23 @@ def start_build_component(db_session, builder, c):
|
|||||||
c.state = koji.BUILD_STATES["FAILED"]
|
c.state = koji.BUILD_STATES["FAILED"]
|
||||||
c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))
|
c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))
|
||||||
log.exception(e)
|
log.exception(e)
|
||||||
with BUILD_COMPONENT_DB_SESSION_LOCK:
|
|
||||||
c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")
|
c.module_build.transition(
|
||||||
db_session.commit()
|
db_session, conf, models.BUILD_STATES["failed"], failure_type="infra"
|
||||||
|
)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if not c.task_id and c.is_building:
|
if not c.task_id and c.is_building:
|
||||||
c.state = koji.BUILD_STATES["FAILED"]
|
c.state = koji.BUILD_STATES["FAILED"]
|
||||||
c.state_reason = "Failed to build artifact %s: Builder did not return task ID" % (c.package)
|
c.state_reason = "Failed to build artifact %s: Builder did not return task ID" % (c.package)
|
||||||
with BUILD_COMPONENT_DB_SESSION_LOCK:
|
|
||||||
c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")
|
c.module_build.transition(
|
||||||
db_session.commit()
|
db_session, conf, models.BUILD_STATES["failed"], failure_type="infra"
|
||||||
return
|
)
|
||||||
|
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
|
||||||
def continue_batch_build(config, module, builder, components=None):
|
def continue_batch_build(config, module, builder, components=None):
|
||||||
@@ -133,21 +133,31 @@ def continue_batch_build(config, module, builder, components=None):
|
|||||||
c.state = koji.BUILD_STATES["BUILDING"]
|
c.state = koji.BUILD_STATES["BUILDING"]
|
||||||
components_to_build.append(c)
|
components_to_build.append(c)
|
||||||
|
|
||||||
|
# Commit to ensure threads see the most recent version of ComponentBuilds
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
# Start build of components in this batch.
|
# Start build of components in this batch.
|
||||||
max_workers = config.num_threads_for_build_submissions
|
max_workers = config.num_threads_for_build_submissions
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
futures = {
|
futures = {
|
||||||
executor.submit(start_build_component, db_session, builder, c): c
|
executor.submit(start_build_component, db_session, builder, c.id): c
|
||||||
for c in components_to_build
|
for c in components_to_build
|
||||||
}
|
}
|
||||||
concurrent.futures.wait(futures)
|
concurrent.futures.wait(futures)
|
||||||
# In case there has been an excepion generated directly in the
|
# In case there has been an exception generated directly in the
|
||||||
# start_build_component, the future.result() will re-raise it in the
|
# start_build_component, the future.result() will re-raise it in the
|
||||||
# main thread so it is not lost.
|
# main thread so it is not lost.
|
||||||
|
#
|
||||||
|
# We get 'SQLite objects created in a thread can only be used in that same thread'
|
||||||
|
# errors in this case, because the finalizer for the connection object
|
||||||
|
# runs in a different thread, but the original exception is still visible.
|
||||||
|
#
|
||||||
for future in futures:
|
for future in futures:
|
||||||
future.result()
|
future.result()
|
||||||
|
|
||||||
db_session.commit()
|
# We need to start a new session here, or SQLite isolation keeps us from seeing
|
||||||
|
# changes that were done in the other threads
|
||||||
|
db_session.close()
|
||||||
|
|
||||||
|
|
||||||
def start_next_batch_build(config, module, builder, components=None):
|
def start_next_batch_build(config, module, builder, components=None):
|
||||||
|
|||||||
@@ -39,14 +39,6 @@ def apply_engine_options(conf):
|
|||||||
}
|
}
|
||||||
if conf.sqlalchemy_database_uri.startswith("sqlite://"):
|
if conf.sqlalchemy_database_uri.startswith("sqlite://"):
|
||||||
options.update({
|
options.update({
|
||||||
# For local module build, MBS is actually a multi-threaded
|
|
||||||
# application. The command submitting a module build runs in its
|
|
||||||
# own thread, and the backend build workflow, implemented as a
|
|
||||||
# fedmsg consumer on top of fedmsg-hub, runs in separate threads.
|
|
||||||
# So, disable this option in order to allow accessing data which
|
|
||||||
# was written from another thread.
|
|
||||||
"connect_args": {"check_same_thread": False},
|
|
||||||
|
|
||||||
# Both local module build and running tests requires a file-based
|
# Both local module build and running tests requires a file-based
|
||||||
# SQLite database, we do not use a connection pool for these two
|
# SQLite database, we do not use a connection pool for these two
|
||||||
# scenarios.
|
# scenarios.
|
||||||
|
|||||||
@@ -172,6 +172,7 @@ class TestBatches:
|
|||||||
start_next_batch_build(conf, module_build, builder)
|
start_next_batch_build(conf, module_build, builder)
|
||||||
|
|
||||||
# Batch number should increase.
|
# Batch number should increase.
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
assert module_build.batch == 2
|
assert module_build.batch == 2
|
||||||
|
|
||||||
# Make sure we only have one message returned for the one reused component
|
# Make sure we only have one message returned for the one reused component
|
||||||
@@ -218,6 +219,8 @@ class TestBatches:
|
|||||||
builder.recover_orphaned_artifact.return_value = []
|
builder.recover_orphaned_artifact.return_value = []
|
||||||
start_next_batch_build(conf, module_build, builder)
|
start_next_batch_build(conf, module_build, builder)
|
||||||
|
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
|
|
||||||
# Batch number should increase.
|
# Batch number should increase.
|
||||||
assert module_build.batch == 2
|
assert module_build.batch == 2
|
||||||
# No component reuse messages should be returned
|
# No component reuse messages should be returned
|
||||||
@@ -233,7 +236,15 @@ class TestBatches:
|
|||||||
builder.build.side_effect = Exception("Something have gone terribly wrong")
|
builder.build.side_effect = Exception("Something have gone terribly wrong")
|
||||||
component = mock.MagicMock()
|
component = mock.MagicMock()
|
||||||
|
|
||||||
start_build_component(db_session, builder, component)
|
component = models.ComponentBuild.from_component_name(
|
||||||
|
db_session, "perl-List-Compare", 3)
|
||||||
|
|
||||||
|
assert component.state != koji.BUILD_STATES["FAILED"]
|
||||||
|
|
||||||
|
start_build_component(db_session, builder, component.id)
|
||||||
|
|
||||||
|
component = models.ComponentBuild.from_component_name(
|
||||||
|
db_session, "perl-List-Compare", 3)
|
||||||
|
|
||||||
assert component.state == koji.BUILD_STATES["FAILED"]
|
assert component.state == koji.BUILD_STATES["FAILED"]
|
||||||
|
|
||||||
@@ -264,6 +275,8 @@ class TestBatches:
|
|||||||
builder.recover_orphaned_artifact.return_value = []
|
builder.recover_orphaned_artifact.return_value = []
|
||||||
start_next_batch_build(conf, module_build, builder)
|
start_next_batch_build(conf, module_build, builder)
|
||||||
|
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
|
|
||||||
# Batch number should increase
|
# Batch number should increase
|
||||||
assert module_build.batch == 2
|
assert module_build.batch == 2
|
||||||
|
|
||||||
@@ -292,6 +305,8 @@ class TestBatches:
|
|||||||
mock_sbc.reset_mock()
|
mock_sbc.reset_mock()
|
||||||
|
|
||||||
# Complete the build
|
# Complete the build
|
||||||
|
plc_component = models.ComponentBuild.from_component_name(
|
||||||
|
db_session, "perl-List-Compare", 3)
|
||||||
plc_component.state = koji.BUILD_STATES["COMPLETE"]
|
plc_component.state = koji.BUILD_STATES["COMPLETE"]
|
||||||
pt_component = models.ComponentBuild.from_component_name(
|
pt_component = models.ComponentBuild.from_component_name(
|
||||||
db_session, "perl-Tangerine", 3)
|
db_session, "perl-Tangerine", 3)
|
||||||
@@ -301,6 +316,9 @@ class TestBatches:
|
|||||||
|
|
||||||
# Start the next build batch
|
# Start the next build batch
|
||||||
start_next_batch_build(conf, module_build, builder)
|
start_next_batch_build(conf, module_build, builder)
|
||||||
|
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
|
|
||||||
# Batch number should increase
|
# Batch number should increase
|
||||||
assert module_build.batch == 3
|
assert module_build.batch == 3
|
||||||
# Verify that tangerine was reused even though perl-Tangerine was rebuilt in the previous
|
# Verify that tangerine was reused even though perl-Tangerine was rebuilt in the previous
|
||||||
@@ -344,6 +362,8 @@ class TestBatches:
|
|||||||
builder.recover_orphaned_artifact.return_value = []
|
builder.recover_orphaned_artifact.return_value = []
|
||||||
start_next_batch_build(conf, module_build, builder)
|
start_next_batch_build(conf, module_build, builder)
|
||||||
|
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
|
|
||||||
# Batch number should increase.
|
# Batch number should increase.
|
||||||
assert module_build.batch == 2
|
assert module_build.batch == 2
|
||||||
|
|
||||||
@@ -357,8 +377,8 @@ class TestBatches:
|
|||||||
|
|
||||||
# Test the order of the scheduling
|
# Test the order of the scheduling
|
||||||
expected_calls = [
|
expected_calls = [
|
||||||
mock.call(db_session, builder, plc_component),
|
mock.call(db_session, builder, plc_component.id),
|
||||||
mock.call(db_session, builder, pt_component)
|
mock.call(db_session, builder, pt_component.id)
|
||||||
]
|
]
|
||||||
assert mock_sbc.mock_calls == expected_calls
|
assert mock_sbc.mock_calls == expected_calls
|
||||||
|
|
||||||
@@ -379,6 +399,8 @@ class TestBatches:
|
|||||||
builder = mock.MagicMock()
|
builder = mock.MagicMock()
|
||||||
start_next_batch_build(conf, module_build, builder)
|
start_next_batch_build(conf, module_build, builder)
|
||||||
|
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
|
|
||||||
# Batch number should not increase.
|
# Batch number should not increase.
|
||||||
assert module_build.batch == 2
|
assert module_build.batch == 2
|
||||||
# Make sure start build was called for the second component which wasn't reused
|
# Make sure start build was called for the second component which wasn't reused
|
||||||
@@ -399,5 +421,7 @@ class TestBatches:
|
|||||||
builder = mock.MagicMock()
|
builder = mock.MagicMock()
|
||||||
builder.buildroot_ready.return_value = False
|
builder.buildroot_ready.return_value = False
|
||||||
|
|
||||||
|
module_build = models.ModuleBuild.get_by_id(db_session, 3)
|
||||||
|
|
||||||
# Batch number should not increase.
|
# Batch number should not increase.
|
||||||
assert module_build.batch == 1
|
assert module_build.batch == 1
|
||||||
|
|||||||
Reference in New Issue
Block a user