From 5e93f60b9e4c0a284121c80c49575c6b310543a0 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 4 Feb 2026 15:28:34 -0500 Subject: [PATCH] AAP-41776 Enable new fancy asyncio metrics for dispatcherd (#16233) * Enable new fancy asyncio metrics for dispatcherd Remove old dispatcher metrics and patch in new data from local whatever Update test fixture to new dispatcherd version * Update dispatcherd again * Handle node filter in URL, and catch more errors * Add test for metric filter * Split module for dispatcherd metrics --- awx/main/analytics/dispatcherd_metrics.py | 41 ++++++++++++++ awx/main/analytics/subsystem_metrics.py | 21 +++---- awx/main/dispatch/config.py | 10 +++- .../functional/analytics/test_metrics.py | 55 +++++++++++++++++++ requirements/requirements.txt | 2 +- 5 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 awx/main/analytics/dispatcherd_metrics.py diff --git a/awx/main/analytics/dispatcherd_metrics.py b/awx/main/analytics/dispatcherd_metrics.py new file mode 100644 index 0000000000..c131220d77 --- /dev/null +++ b/awx/main/analytics/dispatcherd_metrics.py @@ -0,0 +1,41 @@ +import http.client +import socket +import urllib.error +import urllib.request +import logging + +from django.conf import settings + +logger = logging.getLogger(__name__) + + +def get_dispatcherd_metrics(request): + metrics_cfg = settings.METRICS_SUBSYSTEM_CONFIG.get('server', {}).get(settings.METRICS_SERVICE_DISPATCHER, {}) + host = metrics_cfg.get('host', 'localhost') + port = metrics_cfg.get('port', 8015) + metrics_filter = [] + if request is not None and hasattr(request, "query_params"): + try: + nodes_filter = request.query_params.getlist("node") + except Exception: + nodes_filter = [] + if nodes_filter and settings.CLUSTER_HOST_ID not in nodes_filter: + return '' + try: + metrics_filter = request.query_params.getlist("metric") + except Exception: + metrics_filter = [] + if metrics_filter: + # Right now we have no way of filtering the dispatcherd metrics + # so just avoid getting in the way if another metric is filtered for + return '' + url = f"http://{host}:{port}/metrics" + try: + with urllib.request.urlopen(url, timeout=1.0) as response: + payload = response.read() + if not payload: + return '' + return payload.decode('utf-8') + except (urllib.error.URLError, UnicodeError, socket.timeout, TimeoutError, http.client.HTTPException) as exc: + logger.debug(f"Failed to collect dispatcherd metrics from {url}: {exc}") + return '' diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 0b79439285..07f141d266 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -15,6 +15,7 @@ from rest_framework.request import Request from awx.main.consumers import emit_channel_notification from awx.main.utils import is_testing from awx.main.utils.redis import get_redis_client +from .dispatcherd_metrics import get_dispatcherd_metrics root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX logger = logging.getLogger('awx.main.analytics') @@ -398,11 +399,6 @@ class DispatcherMetrics(Metrics): SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'), SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'), - # dispatcher subsystem metrics - SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'), - SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'), - SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'), - SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'), ] def __init__(self, *args, **kwargs): @@ -430,8 +426,12 @@ class CallbackReceiverMetrics(Metrics): def metrics(request): output_text = '' - for m in [DispatcherMetrics(), CallbackReceiverMetrics()]: - output_text += m.generate_metrics(request) + output_text += DispatcherMetrics().generate_metrics(request) + output_text += CallbackReceiverMetrics().generate_metrics(request) + + dispatcherd_metrics = get_dispatcherd_metrics(request) + if dispatcherd_metrics: + output_text += dispatcherd_metrics return output_text @@ -481,13 +481,6 @@ class CallbackReceiverMetricsServer(MetricsServer): super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry) -class DispatcherMetricsServer(MetricsServer): - def __init__(self): - registry = CollectorRegistry(auto_describe=True) - registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False))) - super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry) - - class WebsocketsMetricsServer(MetricsServer): def __init__(self): registry = CollectorRegistry(auto_describe=True) diff --git a/awx/main/dispatch/config.py b/awx/main/dispatch/config.py index 9c6ce3735d..e00f69c059 100644 --- a/awx/main/dispatch/config.py +++ b/awx/main/dispatch/config.py @@ -38,8 +38,8 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False } if mock_publish: - config["brokers"]["noop"] = {} - config["publish"]["default_broker"] = "noop" + config["brokers"]["dispatcherd.testing.brokers.noop"] = {} + config["publish"]["default_broker"] = "dispatcherd.testing.brokers.noop" else: config["brokers"]["pg_notify"] = { "config": get_pg_notify_params(), @@ -56,5 +56,11 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False } config["brokers"]["pg_notify"]["channels"] = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] + metrics_cfg = settings.METRICS_SUBSYSTEM_CONFIG.get('server', {}).get(settings.METRICS_SERVICE_DISPATCHER) + if metrics_cfg: + config["service"]["metrics_kwargs"] = { + "host": metrics_cfg.get("host", "localhost"), + "port": metrics_cfg.get("port", 8015), + } return config diff --git a/awx/main/tests/functional/analytics/test_metrics.py b/awx/main/tests/functional/analytics/test_metrics.py index 88696e2edf..2652db85b7 100644 --- a/awx/main/tests/functional/analytics/test_metrics.py +++ b/awx/main/tests/functional/analytics/test_metrics.py @@ -1,8 +1,11 @@ import pytest +from django.test import RequestFactory from prometheus_client.parser import text_string_to_metric_families +from rest_framework.request import Request from awx.main import models from awx.main.analytics.metrics import metrics +from awx.main.analytics.dispatcherd_metrics import get_dispatcherd_metrics from awx.api.versioning import reverse EXPECTED_VALUES = { @@ -77,3 +80,55 @@ def test_metrics_http_methods(get, post, patch, put, options, admin): assert patch(get_metrics_view_db_only(), user=admin).status_code == 405 assert post(get_metrics_view_db_only(), user=admin).status_code == 405 assert options(get_metrics_view_db_only(), user=admin).status_code == 200 + + +class DummyMetricsResponse: + def __init__(self, payload): + self._payload = payload + + def read(self): + return self._payload + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +def test_dispatcherd_metrics_node_filter_match(mocker, settings): + settings.CLUSTER_HOST_ID = "awx-1" + payload = b'# HELP test_metric A test metric\n# TYPE test_metric gauge\ntest_metric 1\n' + + def fake_urlopen(url, timeout=1.0): + return DummyMetricsResponse(payload) + + mocker.patch('urllib.request.urlopen', fake_urlopen) + + request = Request(RequestFactory().get('/api/v2/metrics/', {'node': 'awx-1'})) + + assert get_dispatcherd_metrics(request) == payload.decode('utf-8') + + +def test_dispatcherd_metrics_node_filter_excludes_local(mocker, settings): + settings.CLUSTER_HOST_ID = "awx-1" + + def fake_urlopen(*args, **kwargs): + raise AssertionError("urlopen should not be called when node filter excludes local node") + + mocker.patch('urllib.request.urlopen', fake_urlopen) + + request = Request(RequestFactory().get('/api/v2/metrics/', {'node': 'awx-2'})) + + assert get_dispatcherd_metrics(request) == '' + + +def test_dispatcherd_metrics_metric_filter_excludes_unrelated(mocker): + def fake_urlopen(*args, **kwargs): + raise AssertionError("urlopen should not be called when metric filter excludes dispatcherd metrics") + + mocker.patch('urllib.request.urlopen', fake_urlopen) + + request = Request(RequestFactory().get('/api/v2/metrics/', {'metric': 'awx_system_info'})) + + assert get_dispatcherd_metrics(request) == '' diff --git a/requirements/requirements.txt b/requirements/requirements.txt index c63729bfaf..75a09bde7b 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -116,7 +116,7 @@ cython==3.1.3 # via -r /awx_devel/requirements/requirements.in daphne==4.2.1 # via -r /awx_devel/requirements/requirements.in -dispatcherd[pg-notify]==2025.12.12 +dispatcherd[pg-notify]==2026.01.27 # via -r /awx_devel/requirements/requirements.in distro==1.9.0 # via -r /awx_devel/requirements/requirements.in