mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 15:45:34 +01:00
Add tracing support using otel to the the following components: - api: extract trace and span IDs from request context - provider: mem put - dispatch: split logic and use better naming - inhibit: source and target traces, mutes, etc. drop metrics - silence: query, expire, mutes - notify: add distributed tracing support to stages and all http requests Note: inhibitor metrics are dropped since we have tracing now and they are not needed. We have not released any version with these metrics so we can drop them safely, this is not a breaking change. This change borrows part of the implementation from #3673 Fixes #3670 Signed-off-by: Dave Henderson <dhenderson@gmail.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com> Co-authored-by: Dave Henderson <dhenderson@gmail.com>
250 lines
8.3 KiB
Go
250 lines
8.3 KiB
Go
// Copyright 2019 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package api
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"runtime"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/prometheus/common/route"
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
|
|
|
apiv2 "github.com/prometheus/alertmanager/api/v2"
|
|
"github.com/prometheus/alertmanager/cluster"
|
|
"github.com/prometheus/alertmanager/config"
|
|
"github.com/prometheus/alertmanager/dispatch"
|
|
"github.com/prometheus/alertmanager/provider"
|
|
"github.com/prometheus/alertmanager/silence"
|
|
"github.com/prometheus/alertmanager/types"
|
|
)
|
|
|
|
// API represents all APIs of Alertmanager.
|
|
type API struct {
|
|
v2 *apiv2.API
|
|
deprecationRouter *V1DeprecationRouter
|
|
|
|
requestDuration *prometheus.HistogramVec
|
|
requestsInFlight prometheus.Gauge
|
|
concurrencyLimitExceeded prometheus.Counter
|
|
timeout time.Duration
|
|
inFlightSem chan struct{}
|
|
}
|
|
|
|
// Options for the creation of an API object. Alerts, Silences, AlertStatusFunc
|
|
// and GroupMutedFunc are mandatory. The zero value for everything else is a safe
|
|
// default.
|
|
type Options struct {
|
|
// Alerts to be used by the API. Mandatory.
|
|
Alerts provider.Alerts
|
|
// Silences to be used by the API. Mandatory.
|
|
Silences *silence.Silences
|
|
// AlertStatusFunc is used be the API to retrieve the AlertStatus of an
|
|
// alert. Mandatory.
|
|
AlertStatusFunc func(model.Fingerprint) types.AlertStatus
|
|
// GroupMutedFunc is used be the API to know if an alert is muted.
|
|
// Mandatory.
|
|
GroupMutedFunc func(routeID, groupKey string) ([]string, bool)
|
|
// Peer from the gossip cluster. If nil, no clustering will be used.
|
|
Peer cluster.ClusterPeer
|
|
// Timeout for all HTTP connections. The zero value (and negative
|
|
// values) result in no timeout.
|
|
Timeout time.Duration
|
|
// Concurrency limit for GET requests. The zero value (and negative
|
|
// values) result in a limit of GOMAXPROCS or 8, whichever is
|
|
// larger. Status code 503 is served for GET requests that would exceed
|
|
// the concurrency limit.
|
|
Concurrency int
|
|
// Logger is used for logging, if nil, no logging will happen.
|
|
Logger *slog.Logger
|
|
// Registry is used to register Prometheus metrics. If nil, no metrics
|
|
// registration will happen.
|
|
Registry prometheus.Registerer
|
|
// RequestDuration is used to measure the duration of HTTP requests.
|
|
RequestDuration *prometheus.HistogramVec
|
|
// GroupFunc returns a list of alert groups. The alerts are grouped
|
|
// according to the current active configuration. Alerts returned are
|
|
// filtered by the arguments provided to the function.
|
|
GroupFunc func(context.Context, func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string, error)
|
|
}
|
|
|
|
func (o Options) validate() error {
|
|
if o.Alerts == nil {
|
|
return errors.New("mandatory field Alerts not set")
|
|
}
|
|
if o.Silences == nil {
|
|
return errors.New("mandatory field Silences not set")
|
|
}
|
|
if o.AlertStatusFunc == nil {
|
|
return errors.New("mandatory field AlertStatusFunc not set")
|
|
}
|
|
if o.GroupMutedFunc == nil {
|
|
return errors.New("mandatory field GroupMutedFunc not set")
|
|
}
|
|
if o.GroupFunc == nil {
|
|
return errors.New("mandatory field GroupFunc not set")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// New creates a new API object combining all API versions. Note that an Update
|
|
// call is also needed to get the APIs into an operational state.
|
|
func New(opts Options) (*API, error) {
|
|
if err := opts.validate(); err != nil {
|
|
return nil, fmt.Errorf("invalid API options: %w", err)
|
|
}
|
|
l := opts.Logger
|
|
if l == nil {
|
|
l = promslog.NewNopLogger()
|
|
}
|
|
concurrency := opts.Concurrency
|
|
if concurrency < 1 {
|
|
concurrency = max(runtime.GOMAXPROCS(0), 8)
|
|
}
|
|
|
|
v2, err := apiv2.NewAPI(
|
|
opts.Alerts,
|
|
opts.GroupFunc,
|
|
opts.AlertStatusFunc,
|
|
opts.GroupMutedFunc,
|
|
opts.Silences,
|
|
opts.Peer,
|
|
l.With("version", "v2"),
|
|
opts.Registry,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "alertmanager_http_requests_in_flight",
|
|
Help: "Current number of HTTP requests being processed.",
|
|
ConstLabels: prometheus.Labels{"method": "get"},
|
|
})
|
|
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_http_concurrency_limit_exceeded_total",
|
|
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
|
|
ConstLabels: prometheus.Labels{"method": "get"},
|
|
})
|
|
if opts.Registry != nil {
|
|
if err := opts.Registry.Register(requestsInFlight); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &API{
|
|
deprecationRouter: NewV1DeprecationRouter(l.With("version", "v1")),
|
|
v2: v2,
|
|
requestDuration: opts.RequestDuration,
|
|
requestsInFlight: requestsInFlight,
|
|
concurrencyLimitExceeded: concurrencyLimitExceeded,
|
|
timeout: opts.Timeout,
|
|
inFlightSem: make(chan struct{}, concurrency),
|
|
}, nil
|
|
}
|
|
|
|
// Register API. As APIv2 works on the http.Handler level, this method also creates a new
|
|
// http.ServeMux and then uses it to register both the provided router (to
|
|
// handle "/") and APIv2 (to handle "<routePrefix>/api/v2"). The method returns
|
|
// the newly created http.ServeMux. If a timeout has been set on construction of
|
|
// API, it is enforced for all HTTP request going through this mux. The same is
|
|
// true for the concurrency limit, with the exception that it is only applied to
|
|
// GET requests.
|
|
func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
|
|
// TODO(gotjosh) API V1 was removed as of version 0.27, when we reach 1.0.0 we should removed these deprecation warnings.
|
|
api.deprecationRouter.Register(r.WithPrefix("/api/v1"))
|
|
|
|
mux := http.NewServeMux()
|
|
mux.Handle("/", api.limitHandler(r))
|
|
|
|
apiPrefix := ""
|
|
if routePrefix != "/" {
|
|
apiPrefix = routePrefix
|
|
}
|
|
mux.Handle(
|
|
apiPrefix+"/api/v2/",
|
|
api.instrumentHandler(
|
|
apiPrefix,
|
|
api.limitHandler(
|
|
http.StripPrefix(
|
|
apiPrefix,
|
|
api.v2.Handler,
|
|
),
|
|
),
|
|
),
|
|
)
|
|
|
|
return mux
|
|
}
|
|
|
|
// Update config and resolve timeout of each API. APIv2 also needs
|
|
// setAlertStatus to be updated.
|
|
func (api *API) Update(cfg *config.Config, setAlertStatus func(ctx context.Context, labels model.LabelSet)) {
|
|
api.v2.Update(cfg, setAlertStatus)
|
|
}
|
|
|
|
func (api *API) limitHandler(h http.Handler) http.Handler {
|
|
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
|
|
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
|
|
select {
|
|
case api.inFlightSem <- struct{}{}: // All good, carry on.
|
|
api.requestsInFlight.Inc()
|
|
defer func() {
|
|
<-api.inFlightSem
|
|
api.requestsInFlight.Dec()
|
|
}()
|
|
default:
|
|
api.concurrencyLimitExceeded.Inc()
|
|
http.Error(rsp, fmt.Sprintf(
|
|
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
|
|
), http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
}
|
|
h.ServeHTTP(rsp, req)
|
|
})
|
|
if api.timeout <= 0 {
|
|
return concLimiter
|
|
}
|
|
return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
|
|
"Exceeded configured timeout of %v.\n", api.timeout,
|
|
))
|
|
}
|
|
|
|
func (api *API) instrumentHandler(prefix string, h http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
path, _ := strings.CutPrefix(r.URL.Path, prefix)
|
|
// avoid high cardinality label values by replacing the actual silence IDs with a placeholder
|
|
if strings.HasPrefix(path, "/api/v2/silence/") {
|
|
path = "/api/v2/silence/{silenceID}"
|
|
}
|
|
promhttp.InstrumentHandlerDuration(
|
|
api.requestDuration.MustCurryWith(prometheus.Labels{"handler": path}),
|
|
otelhttp.WithRouteTag(path, h),
|
|
).ServeHTTP(w, r)
|
|
})
|
|
}
|