mirror of
https://github.com/ansible/awx.git
synced 2026-02-05 09:45:21 +01:00
AAP-41776 Enable new fancy asyncio metrics for dispatcherd (#16233)
* Enable new fancy asyncio metrics for dispatcherd Remove old dispatcher metrics and patch in new data from local whatever Update test fixture to new dispatcherd version * Update dispatcherd again * Handle node filter in URL, and catch more errors * Add test for metric filter * Split module for dispatcherd metrics
This commit is contained in:
41
awx/main/analytics/dispatcherd_metrics.py
Normal file
41
awx/main/analytics/dispatcherd_metrics.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import http.client
|
||||
import socket
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_dispatcherd_metrics(request):
|
||||
metrics_cfg = settings.METRICS_SUBSYSTEM_CONFIG.get('server', {}).get(settings.METRICS_SERVICE_DISPATCHER, {})
|
||||
host = metrics_cfg.get('host', 'localhost')
|
||||
port = metrics_cfg.get('port', 8015)
|
||||
metrics_filter = []
|
||||
if request is not None and hasattr(request, "query_params"):
|
||||
try:
|
||||
nodes_filter = request.query_params.getlist("node")
|
||||
except Exception:
|
||||
nodes_filter = []
|
||||
if nodes_filter and settings.CLUSTER_HOST_ID not in nodes_filter:
|
||||
return ''
|
||||
try:
|
||||
metrics_filter = request.query_params.getlist("metric")
|
||||
except Exception:
|
||||
metrics_filter = []
|
||||
if metrics_filter:
|
||||
# Right now we have no way of filtering the dispatcherd metrics
|
||||
# so just avoid getting in the way if another metric is filtered for
|
||||
return ''
|
||||
url = f"http://{host}:{port}/metrics"
|
||||
try:
|
||||
with urllib.request.urlopen(url, timeout=1.0) as response:
|
||||
payload = response.read()
|
||||
if not payload:
|
||||
return ''
|
||||
return payload.decode('utf-8')
|
||||
except (urllib.error.URLError, UnicodeError, socket.timeout, TimeoutError, http.client.HTTPException) as exc:
|
||||
logger.debug(f"Failed to collect dispatcherd metrics from {url}: {exc}")
|
||||
return ''
|
||||
@@ -15,6 +15,7 @@ from rest_framework.request import Request
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from awx.main.utils import is_testing
|
||||
from awx.main.utils.redis import get_redis_client
|
||||
from .dispatcherd_metrics import get_dispatcherd_metrics
|
||||
|
||||
root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
|
||||
logger = logging.getLogger('awx.main.analytics')
|
||||
@@ -398,11 +399,6 @@ class DispatcherMetrics(Metrics):
|
||||
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
|
||||
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
|
||||
# dispatcher subsystem metrics
|
||||
SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'),
|
||||
SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'),
|
||||
SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'),
|
||||
SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'),
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@@ -430,8 +426,12 @@ class CallbackReceiverMetrics(Metrics):
|
||||
|
||||
def metrics(request):
|
||||
output_text = ''
|
||||
for m in [DispatcherMetrics(), CallbackReceiverMetrics()]:
|
||||
output_text += m.generate_metrics(request)
|
||||
output_text += DispatcherMetrics().generate_metrics(request)
|
||||
output_text += CallbackReceiverMetrics().generate_metrics(request)
|
||||
|
||||
dispatcherd_metrics = get_dispatcherd_metrics(request)
|
||||
if dispatcherd_metrics:
|
||||
output_text += dispatcherd_metrics
|
||||
return output_text
|
||||
|
||||
|
||||
@@ -481,13 +481,6 @@ class CallbackReceiverMetricsServer(MetricsServer):
|
||||
super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry)
|
||||
|
||||
|
||||
class DispatcherMetricsServer(MetricsServer):
|
||||
def __init__(self):
|
||||
registry = CollectorRegistry(auto_describe=True)
|
||||
registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False)))
|
||||
super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry)
|
||||
|
||||
|
||||
class WebsocketsMetricsServer(MetricsServer):
|
||||
def __init__(self):
|
||||
registry = CollectorRegistry(auto_describe=True)
|
||||
|
||||
@@ -38,8 +38,8 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False
|
||||
}
|
||||
|
||||
if mock_publish:
|
||||
config["brokers"]["noop"] = {}
|
||||
config["publish"]["default_broker"] = "noop"
|
||||
config["brokers"]["dispatcherd.testing.brokers.noop"] = {}
|
||||
config["publish"]["default_broker"] = "dispatcherd.testing.brokers.noop"
|
||||
else:
|
||||
config["brokers"]["pg_notify"] = {
|
||||
"config": get_pg_notify_params(),
|
||||
@@ -56,5 +56,11 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False
|
||||
}
|
||||
|
||||
config["brokers"]["pg_notify"]["channels"] = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
||||
metrics_cfg = settings.METRICS_SUBSYSTEM_CONFIG.get('server', {}).get(settings.METRICS_SERVICE_DISPATCHER)
|
||||
if metrics_cfg:
|
||||
config["service"]["metrics_kwargs"] = {
|
||||
"host": metrics_cfg.get("host", "localhost"),
|
||||
"port": metrics_cfg.get("port", 8015),
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import pytest
|
||||
|
||||
from django.test import RequestFactory
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
from rest_framework.request import Request
|
||||
from awx.main import models
|
||||
from awx.main.analytics.metrics import metrics
|
||||
from awx.main.analytics.dispatcherd_metrics import get_dispatcherd_metrics
|
||||
from awx.api.versioning import reverse
|
||||
|
||||
EXPECTED_VALUES = {
|
||||
@@ -77,3 +80,55 @@ def test_metrics_http_methods(get, post, patch, put, options, admin):
|
||||
assert patch(get_metrics_view_db_only(), user=admin).status_code == 405
|
||||
assert post(get_metrics_view_db_only(), user=admin).status_code == 405
|
||||
assert options(get_metrics_view_db_only(), user=admin).status_code == 200
|
||||
|
||||
|
||||
class DummyMetricsResponse:
|
||||
def __init__(self, payload):
|
||||
self._payload = payload
|
||||
|
||||
def read(self):
|
||||
return self._payload
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
|
||||
def test_dispatcherd_metrics_node_filter_match(mocker, settings):
|
||||
settings.CLUSTER_HOST_ID = "awx-1"
|
||||
payload = b'# HELP test_metric A test metric\n# TYPE test_metric gauge\ntest_metric 1\n'
|
||||
|
||||
def fake_urlopen(url, timeout=1.0):
|
||||
return DummyMetricsResponse(payload)
|
||||
|
||||
mocker.patch('urllib.request.urlopen', fake_urlopen)
|
||||
|
||||
request = Request(RequestFactory().get('/api/v2/metrics/', {'node': 'awx-1'}))
|
||||
|
||||
assert get_dispatcherd_metrics(request) == payload.decode('utf-8')
|
||||
|
||||
|
||||
def test_dispatcherd_metrics_node_filter_excludes_local(mocker, settings):
|
||||
settings.CLUSTER_HOST_ID = "awx-1"
|
||||
|
||||
def fake_urlopen(*args, **kwargs):
|
||||
raise AssertionError("urlopen should not be called when node filter excludes local node")
|
||||
|
||||
mocker.patch('urllib.request.urlopen', fake_urlopen)
|
||||
|
||||
request = Request(RequestFactory().get('/api/v2/metrics/', {'node': 'awx-2'}))
|
||||
|
||||
assert get_dispatcherd_metrics(request) == ''
|
||||
|
||||
|
||||
def test_dispatcherd_metrics_metric_filter_excludes_unrelated(mocker):
|
||||
def fake_urlopen(*args, **kwargs):
|
||||
raise AssertionError("urlopen should not be called when metric filter excludes dispatcherd metrics")
|
||||
|
||||
mocker.patch('urllib.request.urlopen', fake_urlopen)
|
||||
|
||||
request = Request(RequestFactory().get('/api/v2/metrics/', {'metric': 'awx_system_info'}))
|
||||
|
||||
assert get_dispatcherd_metrics(request) == ''
|
||||
|
||||
@@ -116,7 +116,7 @@ cython==3.1.3
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
daphne==4.2.1
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
dispatcherd[pg-notify]==2025.12.12
|
||||
dispatcherd[pg-notify]==2026.01.27
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
distro==1.9.0
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
|
||||
Reference in New Issue
Block a user