1
0
mirror of https://github.com/coreos/prometheus-operator.git synced 2026-02-05 06:45:27 +01:00
Files
2025-09-25 14:11:04 +02:00

275 lines
7.3 KiB
Go

// Copyright 2017 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package framework
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
)
func SourceToIOReader(source string) (io.Reader, error) {
if strings.HasPrefix(source, "http") {
return URLToIOReader(source)
}
return PathToOSFile(source)
}
func PathToOSFile(relativePath string) (*os.File, error) {
path, err := filepath.Abs(relativePath)
if err != nil {
return nil, fmt.Errorf("failed generate absolute file path of %s: %w", relativePath, err)
}
manifest, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %w", path, err)
}
return manifest, nil
}
func URLToIOReader(url string) (io.Reader, error) {
var resp *http.Response
timeout := 30 * time.Second
err := wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, false, func(_ context.Context) (bool, error) {
var err error
resp, err = http.Get(url)
if err == nil && resp.StatusCode == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return nil, fmt.Errorf(
"waiting for %v to return a successful status code timed out. Last response from server was: %v: %w",
url,
resp,
err,
)
}
return resp.Body, nil
}
// WaitForPodsReady waits for a selection of Pods to be running and each
// container to pass its readiness check.
func (f *Framework) WaitForPodsReady(ctx context.Context, namespace string, timeout time.Duration, expectedReplicas int, opts metav1.ListOptions) error {
return wait.PollUntilContextTimeout(ctx, time.Second, timeout, false, func(ctx context.Context) (bool, error) {
pl, err := f.KubeClient.CoreV1().Pods(namespace).List(ctx, opts)
if err != nil {
return false, err
}
runningAndReady := 0
for _, p := range pl.Items {
isRunningAndReady, err := k8sutil.PodRunningAndReady(p)
if err != nil {
return false, nil
}
if isRunningAndReady {
runningAndReady++
}
}
if runningAndReady == expectedReplicas {
return true, nil
}
return false, nil
})
}
func (f *Framework) WaitForPodsRunImage(ctx context.Context, namespace string, expectedReplicas int, image string, opts metav1.ListOptions) error {
return wait.PollUntilContextTimeout(ctx, time.Second, time.Minute*5, false, func(ctx context.Context) (bool, error) {
pl, err := f.KubeClient.CoreV1().Pods(namespace).List(ctx, opts)
if err != nil {
return false, err
}
runningImage := 0
for _, p := range pl.Items {
if podRunsImage(p, image) {
runningImage++
}
}
if runningImage == expectedReplicas {
return true, nil
}
return false, nil
})
}
func WaitForHTTPSuccessStatusCode(timeout time.Duration, url string) error {
var resp *http.Response
err := wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, false, func(_ context.Context) (bool, error) {
var err error
resp, err = http.Get(url)
if err == nil && resp.StatusCode == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return fmt.Errorf(
"waiting for %v to return a successful status code timed out. Last response from server was: %v: %w",
url,
resp,
err,
)
}
return nil
}
func podRunsImage(p v1.Pod, image string) bool {
for _, c := range p.Spec.Containers {
if image == c.Image {
return true
}
}
return false
}
// ProxyGetPod executes an HTTP(S) request against the default port of the pod
// using the Proxy API.
func (f *Framework) ProxyGetPod(ctx context.Context, scheme, namespace, pod, path string) ([]byte, error) {
b, err := f.KubeClient.
CoreV1().
Pods(namespace).
ProxyGet(scheme, pod, "", path, nil).
DoRaw(ctx)
if err != nil {
return nil, err
}
return b, nil
}
// ProxyPostPod expects resourceName as "[protocol:]podName[:portNameOrNumber]".
// protocol is optional and the valid values are "http" and "https".
// Without specifying protocol, "http" will be used.
// podName is mandatory.
// portNameOrNumber is optional.
// Without specifying portNameOrNumber, default port will be used.
func (f *Framework) ProxyPostPod(namespace, resourceName, path, body string) *rest.Request {
return f.KubeClient.
CoreV1().
RESTClient().
Post().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(resourceName).
Suffix(path).
Body([]byte(body)).
SetHeader("Content-Type", "application/json")
}
// GetMetricValueFromPod sends an HTTP(S) request to the /metrics endpoint of the pod
// using the Proxy API, parses the response and returns the flot64 value of the
// first series matching the metric name.
// If protocol is empty, HTTP is used.
// If portNumberOfName is empty, the default pod's port is used.
func (f *Framework) GetMetricValueFromPod(ctx context.Context, protocol, ns, podName, portNumberOrName, metricName string) (float64, error) {
b, err := f.KubeClient.
CoreV1().
Pods(ns).
ProxyGet(protocol, podName, portNumberOrName, "/metrics", nil).
DoRaw(ctx)
if err != nil {
return 0, fmt.Errorf("error reading /metrics: %w", err)
}
return getMetricValue(b, metricName)
}
// EnsureMetricsFromService sends an HTTP(S) request to the /metrics endpoint
// of the service using the Proxy API, parses the response and returns the
// flot64 value of the first series matching the metric name.
// If protocol is empty, HTTP is used.
// If portNumberOfName is empty, the default pod's port is used.
func (f *Framework) EnsureMetricsFromService(ctx context.Context, protocol, ns, service, portNumberOrName string, metrics ...string) error {
if len(metrics) == 0 {
return fmt.Errorf("need to provide at least 1 metric to check")
}
b, err := f.KubeClient.
CoreV1().
Services(ns).
ProxyGet(protocol, service, portNumberOrName, "/metrics", nil).
DoRaw(ctx)
if err != nil {
return fmt.Errorf("error reading /metrics: %w", err)
}
for _, m := range metrics {
_, err = getMetricValue(b, m)
if err != nil {
return fmt.Errorf("metric %s: %w", m, err)
}
}
return nil
}
func getMetricValue(b []byte, metricName string) (float64, error) {
parser := textparse.NewPromParser(b, labels.NewSymbolTable(), false)
for {
entry, err := parser.Next()
if err != nil {
return 0, err
}
if entry == textparse.EntryInvalid {
return 0, fmt.Errorf("invalid prometheus metric entry")
}
if entry != textparse.EntrySeries {
continue
}
seriesLabels := labels.Labels{}
parser.Labels(&seriesLabels)
if seriesLabels.Get("__name__") != metricName {
continue
}
_, _, val := parser.Series()
return val, nil
}
}