diff --git a/docker/Dockerfile-tests b/docker/Dockerfile-tests index cbfb766e..c014e8ff 100644 --- a/docker/Dockerfile-tests +++ b/docker/Dockerfile-tests @@ -32,6 +32,7 @@ RUN yum -y install \ python-mock \ python-munch \ python-pip \ + python-prometheus_client \ python-requests \ python-six \ python-solv \ diff --git a/docker/Dockerfile-tests-py3 b/docker/Dockerfile-tests-py3 index ea86b62e..566c9e50 100644 --- a/docker/Dockerfile-tests-py3 +++ b/docker/Dockerfile-tests-py3 @@ -21,6 +21,7 @@ RUN dnf -y install \ python3-ldap3 \ python3-munch \ python3-pip \ + python3-prometheus_client \ python3-requests \ python3-six \ python3-solv \ diff --git a/module_build_service/messaging.py b/module_build_service/messaging.py index 09e8d23f..dfb1091c 100644 --- a/module_build_service/messaging.py +++ b/module_build_service/messaging.py @@ -260,7 +260,18 @@ def publish(topic, msg, conf, service): except KeyError: raise KeyError("No messaging backend found for %r in %r" % ( conf.messaging, _messaging_backends.keys())) - return handler(topic, msg, conf, service) + + from module_build_service.monitor import ( + messaging_tx_to_send_counter, messaging_tx_sent_ok_counter, + messaging_tx_failed_counter) + messaging_tx_to_send_counter.inc() + try: + rv = handler(topic, msg, conf, service) + messaging_tx_sent_ok_counter.inc() + return rv + except Exception: + messaging_tx_failed_counter.inc() + raise def _fedmsg_publish(topic, msg, conf, service): diff --git a/module_build_service/models.py b/module_build_service/models.py index 1b0ac621..a860f597 100644 --- a/module_build_service/models.py +++ b/module_build_service/models.py @@ -118,6 +118,10 @@ def _setup_event_listeners(session): sqlalchemy.event.listen(session, 'before_commit', session_before_commit_handlers) + # initialize DB event listeners from the monitor module + from module_build_service.monitor import db_hook_event_listeners + db_hook_event_listeners(session.bind.engine) + @contextlib.contextmanager def make_session(conf): diff --git a/module_build_service/monitor.py b/module_build_service/monitor.py new file mode 100644 index 00000000..70d6afcf --- /dev/null +++ b/module_build_service/monitor.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2019 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# For an up-to-date version of this module, see: +# https://pagure.io/monitor-flask-sqlalchemy + +import os +import tempfile + +from flask import Blueprint, Response +from prometheus_client import ( # noqa: F401 + ProcessCollector, CollectorRegistry, Counter, multiprocess, + Histogram, generate_latest, start_http_server, CONTENT_TYPE_LATEST) +from sqlalchemy import event + +# Service-specific imports +from module_build_service.utils import cors_header, validate_api_version + + +if not os.environ.get('prometheus_multiproc_dir'): + os.environ.setdefault('prometheus_multiproc_dir', tempfile.mkdtemp()) +registry = CollectorRegistry() +ProcessCollector(registry=registry) +multiprocess.MultiProcessCollector(registry) +if os.getenv('MONITOR_STANDALONE_METRICS_SERVER_ENABLE', 'false') == 'true': + port = os.getenv('MONITOR_STANDALONE_METRICS_SERVER_PORT', '10040') + start_http_server(int(port), registry=registry) + + +# Generic metrics +messaging_rx_counter = Counter( + 'messaging_rx', + 'Total number of messages received', + registry=registry) +messaging_rx_processed_ok_counter = Counter( + 'messaging_rx_processed_ok', + 'Number of received messages, which were processed successfully', + registry=registry) +messaging_rx_failed_counter = Counter( + 'messaging_rx_failed', + 'Number of received messages, which failed during processing', + registry=registry) + +messaging_tx_to_send_counter = Counter( + 'messaging_tx_to_send', + 'Total number of messages to send', + registry=registry) +messaging_tx_sent_ok_counter = Counter( + 'messaging_tx_sent_ok', + 'Number of messages, which were sent successfully', + registry=registry) +messaging_tx_failed_counter = Counter( + 'messaging_tx_failed', + 'Number of messages, for which the sender failed', + registry=registry) + +db_dbapi_error_counter = Counter( + 'db_dbapi_error', + 'Number of DBAPI errors', + registry=registry) +db_engine_connect_counter = Counter( + 'db_engine_connect', + 'Number of \'engine_connect\' events', + registry=registry) +db_handle_error_counter = Counter( + 'db_handle_error', + 'Number of exceptions during connection', + registry=registry) +db_transaction_rollback_counter = Counter( + 'db_transaction_rollback', + 'Number of transactions, which were rolled back', + registry=registry) + +# Service-specific metrics +# XXX: TODO + + +def db_hook_event_listeners(target=None): + # Service-specific import of db + from module_build_service import db + + if not target: + target = db.engine + + @event.listens_for(target, 'dbapi_error', named=True) + def receive_dbapi_error(**kw): + db_dbapi_error_counter.inc() + + @event.listens_for(target, 'engine_connect') + def receive_engine_connect(conn, branch): + db_engine_connect_counter.inc() + + @event.listens_for(target, 'handle_error') + def receive_handle_error(exception_context): + db_handle_error_counter.inc() + + @event.listens_for(target, 'rollback') + def receive_rollback(conn): + db_transaction_rollback_counter.inc() + + +monitor_api = Blueprint( + 'monitor', __name__, + url_prefix='/module-build-service//monitor') + + +@cors_header() +@validate_api_version() +@monitor_api.route('/metrics') +def metrics(api_version): + return Response(generate_latest(registry), + content_type=CONTENT_TYPE_LATEST) diff --git a/module_build_service/scheduler/consumer.py b/module_build_service/scheduler/consumer.py index e33621e8..6f5afd29 100644 --- a/module_build_service/scheduler/consumer.py +++ b/module_build_service/scheduler/consumer.py @@ -47,6 +47,7 @@ import module_build_service.scheduler.handlers.repos import module_build_service.scheduler.handlers.components import module_build_service.scheduler.handlers.modules import module_build_service.scheduler.handlers.tags +import module_build_service.monitor as monitor from module_build_service import models, log, conf @@ -147,6 +148,8 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): super(MBSConsumer, self).validate(message) def consume(self, message): + monitor.messaging_rx_counter.inc() + # Sometimes, the messages put into our queue are artificially put there # by other parts of our own codebase. If they are already abstracted # messages, then just use them as-is. If they are not already @@ -161,7 +164,9 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): try: with models.make_session(conf) as session: self.process_message(session, msg) + monitor.messaging_rx_processed_ok_counter.inc() except sqlalchemy.exc.OperationalError as error: + monitor.messaging_rx_failed_counter.inc() if 'could not translate host name' in str(error): log.exception( "SQLAlchemy can't resolve DNS records. Scheduling fedmsg-hub to shutdown.") @@ -169,6 +174,7 @@ class MBSConsumer(fedmsg.consumers.FedmsgConsumer): else: raise except Exception: + monitor.messaging_rx_failed_counter.inc() log.exception('Failed while handling {0!r}'.format(msg)) if self.stop_condition and self.stop_condition(message): diff --git a/module_build_service/views.py b/module_build_service/views.py index edcbe84e..eeb118eb 100644 --- a/module_build_service/views.py +++ b/module_build_service/views.py @@ -41,6 +41,7 @@ from module_build_service.utils import ( from module_build_service.errors import ( ValidationError, Forbidden, NotFound, ProgrammingError) from module_build_service.backports import jsonify +from module_build_service.monitor import monitor_api api_routes = { @@ -461,5 +462,7 @@ def register_api(): else: raise NotImplementedError("Unhandled api key.") + app.register_blueprint(monitor_api) + register_api() diff --git a/requirements.txt b/requirements.txt index a5a732a0..1fc783c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ koji ldap3 moksha.hub munch +prometheus_client pyOpenSSL pygobject requests diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 00000000..6d89ef0b --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,58 @@ +# Copyright (c) 2019 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# Written by Filip Valder + +import os +import pytest +import requests +import module_build_service.monitor + +from six.moves import reload_module +from tests import app, init_data + +num_of_metrics = 16 + + +class TestViews: + def setup_method(self, test_method): + self.client = app.test_client() + init_data(2) + + def test_metrics(self): + rv = self.client.get('/module-build-service/1/monitor/metrics') + + assert len([l for l in rv.get_data(as_text=True).splitlines() + if (l.startswith('# TYPE') and '_created ' not in l)]) == num_of_metrics + + +def test_standalone_metrics_server_disabled_by_default(): + with pytest.raises(requests.exceptions.ConnectionError): + requests.get('http://127.0.0.1:10040/metrics') + + +def test_standalone_metrics_server(): + os.environ['MONITOR_STANDALONE_METRICS_SERVER_ENABLE'] = 'true' + reload_module(module_build_service.monitor) + + r = requests.get('http://127.0.0.1:10040/metrics') + + assert len([l for l in r.text.splitlines() + if (l.startswith('# TYPE') and '_created ' not in l)]) == num_of_metrics