diff --git a/module_build_service/scheduler/producer.py b/module_build_service/scheduler/producer.py index d43ce2cd..2650d66d 100644 --- a/module_build_service/scheduler/producer.py +++ b/module_build_service/scheduler/producer.py @@ -44,8 +44,7 @@ class MBSProducer(PollingProducer): with models.make_session(conf) as session: try: self.log_summary(session) - # XXX: detect whether it's actually stuck first - # self.process_waiting_module_builds(session) + self.process_waiting_module_builds(session) self.process_open_component_builds(session) self.fail_lost_builds(session) self.process_paused_module_builds(conf, session) @@ -166,18 +165,24 @@ class MBSProducer(PollingProducer): builds = models.ModuleBuild.by_state(session, 'wait') log.info(' {0!r} module builds in the wait state...' .format(len(builds))) + now = datetime.utcnow() + ten_minutes = timedelta(minutes=10) for build in builds: - # Fake a message to kickstart the build anew + + # Only give builds a nudge if stuck for more than ten minutes + if (now - build.time_modified) < ten_minutes: + continue + + # Pretend the build is modified, so we don't tight spin. + build.time_modified = now + session.commit() + + # Fake a message to kickstart the build anew in the consumer + state = module_build_service.models.BUILD_STATES['wait'] msg = module_build_service.messaging.MBSModule( - 'fake message', - build.id, - module_build_service.models.BUILD_STATES['wait'] - ) - further_work = module_build_service.scheduler.handlers.modules.wait( - conf, session, msg) or [] - for event in further_work: - log.info(" Scheduling faked event %r" % event) - module_build_service.scheduler.consumer.work_queue_put(event) + 'fake message', build.id, state) + log.info(" Scheduling faked event %r" % msg) + module_build_service.scheduler.consumer.work_queue_put(msg) def process_open_component_builds(self, session): log.warning('process_open_component_builds is not yet implemented...') diff --git a/tests/test_scheduler/test_poller.py b/tests/test_scheduler/test_poller.py index b8383b58..d8ae55b3 100644 --- a/tests/test_scheduler/test_poller.py +++ b/tests/test_scheduler/test_poller.py @@ -255,3 +255,84 @@ class TestPoller(unittest.TestCase): if state_name in ["done", "ready", "failed"]: koji_session.deleteBuildTarget.assert_called_once_with(852) + + def test_process_waiting_module_build( + self, create_builder, koji_get_session, global_consumer, dbg): + """ Test that processing old waiting module builds works. """ + + consumer = mock.MagicMock() + consumer.incoming = queue.Queue() + global_consumer.return_value = consumer + + hub = mock.MagicMock() + poller = MBSProducer(hub) + + # Change the batch to 2, so the module build is in state where + # it is not building anything, but the state is "build". + module_build = models.ModuleBuild.query.filter_by(id=2).one() + module_build.state = 1 + original = datetime.utcnow() - timedelta(minutes=11) + module_build.time_modified = original + db.session.commit() + db.session.refresh(module_build) + + # Ensure the queue is empty before we start. + self.assertEquals(consumer.incoming.qsize(), 0) + + # Poll :) + poller.process_waiting_module_builds(db.session) + + self.assertEquals(consumer.incoming.qsize(), 1) + module_build = models.ModuleBuild.query.filter_by(id=2).one() + # ensure the time_modified was changed. + self.assertGreater(module_build.time_modified, original) + + + def test_process_waiting_module_build_not_old_enough( + self, create_builder, koji_get_session, global_consumer, dbg): + """ Test that we do not process young waiting builds. """ + + consumer = mock.MagicMock() + consumer.incoming = queue.Queue() + global_consumer.return_value = consumer + + hub = mock.MagicMock() + poller = MBSProducer(hub) + + # Change the batch to 2, so the module build is in state where + # it is not building anything, but the state is "build". + module_build = models.ModuleBuild.query.filter_by(id=2).one() + module_build.state = 1 + original = datetime.utcnow() - timedelta(minutes=9) + module_build.time_modified = original + db.session.commit() + db.session.refresh(module_build) + + # Ensure the queue is empty before we start. + self.assertEquals(consumer.incoming.qsize(), 0) + + # Poll :) + poller.process_waiting_module_builds(db.session) + + # Ensure we did *not* process the 9 minute-old build. + self.assertEquals(consumer.incoming.qsize(), 0) + + def test_process_waiting_module_build_none_found( + self, create_builder, koji_get_session, global_consumer, dbg): + """ Test nothing happens when no module builds are waiting. """ + + consumer = mock.MagicMock() + consumer.incoming = queue.Queue() + global_consumer.return_value = consumer + + hub = mock.MagicMock() + poller = MBSProducer(hub) + + # Ensure the queue is empty before we start. + self.assertEquals(consumer.incoming.qsize(), 0) + + # Poll :) + poller.process_waiting_module_builds(db.session) + + # Ensure we did *not* process any of the non-waiting builds. + self.assertEquals(consumer.incoming.qsize(), 0)