1
0
mirror of https://github.com/openshift/installer.git synced 2026-02-05 15:47:14 +01:00

CORS-2852: Introduce pkg/clusterapi, system, and local control plane

Signed-off-by: Vince Prignano <vincepri@redhat.com>
This commit is contained in:
Vince Prignano
2023-10-31 08:02:14 -07:00
parent c440672ed9
commit 17a7a4c2ea
14 changed files with 1149 additions and 2 deletions

35
docs/dev/cluster-api.md Normal file
View File

@@ -0,0 +1,35 @@
# Cluster API
The installer uses Cluster API controllers through a local control plane powered by `kube-apiserver` and `etcd` running locally.
### Local control plane
The local control plane is setup using the previously available work done in Controller Runtime through [envtest](https://github.com/kubernetes-sigs/controller-runtime/tree/main/tools/setup-envtest). Envtest was born due to a necessity to run integration tests for controllers against a real API server, register webhooks (conversion, admission, validation), and managing the lifecycle of Custom Resource Definitions.
Over time, `envtest` matured in a way that now can be used to run controllers in a local environment, reducing or eliminating the need for a full Kubernetes cluster to run controllers.
At a high level, the local control plane is responsible for:
- Setting up certificates for the apiserver and etcd.
- Running (and cleaning up, on shutdown) the local control plane components.
- Installing any required component, like Custom Resource Definitions (CRDs)
- For Cluster API core the CRDs are stored in `data/data/cluster-api/core-components.yaml`.
- Infrastructure providers are expected to store their components in `data/data/cluster-api/<name>-infrastructure-components.yaml`
- Upon install, the local control plane takes care of modifying any webhook (conversion, admission, validation) to point to the `host:post` combination assigned.
- Each controller manager will have its own `host:port` combination assigned.
- Certificates are generated and injected in the server, and the client certs in the api-server webhook configuration.
- For each process that the local control plane manages, a health check (ping to `/healthz`) is required to pass similarly how, when running in a Deployment, a health probe is configured.
### Build and packaging
The Cluster API system is formed of a set of binaries. The core Cluster API manager, and the infrastructure providers are built using Go Modules in the `cluster-api` folder.
The binaries are built and packaged during the standard installer build process, `hack/build.sh`. Cluster API specific build flow is contained in the `hack/build-cluster-api.sh` script:
- Only enabled if the `OPENSHIFT_INSTALL_CLUSTER_API` environment variable is set.
- Builds (as needed) every binary listed as a Go Module in the `cluster-api` folder.
- Downloads (as needed) the specified version of `envtest` to pacakge `kube-apiserver` and `etcd`.
- Produces a single `cluster-api.zip` file which is then copied in `pkg/clusterapi/mirror`.
To build an `openshift-install` binary with Cluster API bundled:
- Set `export OPENSHIFT_INSTALL_CLUSTER_API=y`
- Optionally `export SKIP_TERRAFORM=y` if you don't need to use Terraform.
- Run `./hack/build.sh`, the binary is then produced in `bin/openshift-install`.

View File

@@ -9,7 +9,7 @@ fi
TARGET_OS_ARCH=$(go env GOOS)_$(go env GOARCH)
CLUSTER_API_BIN_DIR="${PWD}/cluster-api/bin/${TARGET_OS_ARCH}"
CLUSTER_API_MIRROR_DIR="${PWD}/pkg/cluster-api/mirror/"
CLUSTER_API_MIRROR_DIR="${PWD}/pkg/clusterapi/mirror/"
ENVTEST_K8S_VERSION="1.28.0"
ENVTEST_ARCH=$(go env GOOS)-$(go env GOARCH)

View File

@@ -0,0 +1,124 @@
package addr
import (
"errors"
"fmt"
"io/fs"
"net"
"os"
"path/filepath"
"strings"
"time"
"github.com/openshift/installer/pkg/clusterapi/internal/process/flock"
)
const (
portReserveTime = 2 * time.Minute
portConflictRetry = 100
portFilePrefix = "port-"
)
var (
cacheDir string
)
func init() {
baseDir, err := os.UserCacheDir()
if err == nil {
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
err = os.MkdirAll(cacheDir, 0o750)
}
if err != nil {
// Either we didn't get a cache directory, or we can't use it
baseDir = os.TempDir()
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
err = os.MkdirAll(cacheDir, 0o750)
}
if err != nil {
panic(err)
}
}
type portCache struct{}
func (c *portCache) add(port int) (bool, error) {
// Remove outdated ports.
if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) {
return nil
}
info, err := d.Info()
if err != nil {
// No-op if file no longer exists; may have been deleted by another
// process/thread trying to allocate ports.
if errors.Is(err, fs.ErrNotExist) {
return nil
}
return err
}
if time.Since(info.ModTime()) > portReserveTime {
if err := os.Remove(filepath.Join(cacheDir, path)); err != nil {
// No-op if file no longer exists; may have been deleted by another
// process/thread trying to allocate ports.
if os.IsNotExist(err) {
return nil
}
return err
}
}
return nil
}); err != nil {
return false, err
}
// Try allocating new port, by acquiring a file.
path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
var cache = &portCache{}
func suggest(listenHost string) (*net.TCPListener, int, string, error) {
if listenHost == "" {
listenHost = "localhost"
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
if err != nil {
return nil, -1, "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, -1, "", err
}
return l, l.Addr().(*net.TCPAddr).Port,
addr.IP.String(),
nil
}
// Suggest suggests an address a process can listen on. It returns
// a tuple consisting of a free port and the hostname resolved to its IP.
// It makes sure that new port allocated does not conflict with old ports
// allocated within 1 minute.
func Suggest(listenHost string) (int, string, error) {
for i := 0; i < portConflictRetry; i++ {
listener, port, resolvedHost, err := suggest(listenHost)
if err != nil {
return -1, "", err
}
defer listener.Close()
if ok, err := cache.add(port); ok {
return port, resolvedHost, nil
} else if err != nil {
return -1, "", err
}
}
return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry)
}

View File

@@ -0,0 +1,5 @@
// Package flock is copied from k8s.io/kubernetes/pkg/util/flock to avoid
// importing k8s.io/kubernetes as a dependency.
//
// Provides file locking functionalities on unix systems.
package flock

View File

@@ -0,0 +1,8 @@
package flock
import "errors"
var (
// ErrAlreadyLocked is returned when the file is already locked.
ErrAlreadyLocked = errors.New("the file is already locked")
)

View File

@@ -0,0 +1,9 @@
//go:build !linux && !darwin && !freebsd && !openbsd && !netbsd && !dragonfly
// +build !linux,!darwin,!freebsd,!openbsd,!netbsd,!dragonfly
package flock
// Acquire is not implemented on non-unix systems.
func Acquire(path string) error {
return nil
}

View File

@@ -0,0 +1,32 @@
//go:build linux || darwin || freebsd || openbsd || netbsd || dragonfly
// +build linux darwin freebsd openbsd netbsd dragonfly
package flock
import (
"errors"
"fmt"
"os"
"golang.org/x/sys/unix"
)
// Acquire acquires a lock on a file for the duration of the process. This method
// is reentrant.
func Acquire(path string) error {
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
if err != nil {
if errors.Is(err, os.ErrExist) {
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}
// We don't need to close the fd since we should hold
// it until the process exits.
err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX)
if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB.
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}

View File

@@ -0,0 +1,256 @@
package process
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os/exec"
"path"
"regexp"
"sync"
"syscall"
"time"
"github.com/sirupsen/logrus"
)
// ListenAddr represents some listening address and port.
type ListenAddr struct {
Address string
Port string
}
// URL returns a URL for this address with the given scheme and subpath.
func (l *ListenAddr) URL(scheme string, path string) *url.URL {
return &url.URL{
Scheme: scheme,
Host: l.HostPort(),
Path: path,
}
}
// HostPort returns the joined host-port pair for this address.
func (l *ListenAddr) HostPort() string {
return net.JoinHostPort(l.Address, l.Port)
}
// HealthCheck describes the information needed to health-check a process via
// some health-check URL.
type HealthCheck struct {
url.URL
// HealthCheckPollInterval is the interval which will be used for polling the
// endpoint described by Host, Port, and Path.
//
// If left empty it will default to 100 Milliseconds.
PollInterval time.Duration
}
// State define the state of the process.
type State struct {
Cmd *exec.Cmd
// HealthCheck describes how to check if this process is up. If we get an http.StatusOK,
// we assume the process is ready to operate.
//
// For example, the /healthz endpoint of the k8s API server, or the /health endpoint of etcd.
HealthCheck *HealthCheck
Dir string
Args []string
Env []string
StopTimeout time.Duration
StartTimeout time.Duration
Path string
// ready holds whether the process is currently in ready state (hit the ready condition) or not.
// It will be set to true on a successful `Start()` and set to false on a successful `Stop()`
ready bool
// waitDone is closed when our call to wait finishes up, and indicates that
// our process has terminated.
waitDone chan struct{}
errMu sync.Mutex
exitErr error
exited bool
}
// Init sets up this process, configuring binary paths if missing, initializing
// temporary directories, etc.
//
// This defaults all defaultable fields.
func (ps *State) Init(name string) error {
if ps.Path == "" {
if name == "" {
return fmt.Errorf("must have at least one of name or path")
}
}
if ps.StartTimeout == 0 {
ps.StartTimeout = 20 * time.Second
}
if ps.StopTimeout == 0 {
ps.StopTimeout = 20 * time.Second
}
return nil
}
type stopChannel chan struct{}
// CheckFlag checks the help output of this command for the presence of the given flag, specified
// without the leading `--` (e.g. `CheckFlag("insecure-port")` checks for `--insecure-port`),
// returning true if the flag is present.
func (ps *State) CheckFlag(flag string) (bool, error) {
cmd := exec.Command(ps.Path, "--help") //nolint:gosec
outContents, err := cmd.CombinedOutput()
if err != nil {
return false, fmt.Errorf("unable to run command %q to check for flag %q: %w", ps.Path, flag, err)
}
pat := `(?m)^\s*--` + flag + `\b` // (m --> multi-line --> ^ matches start of line)
matched, err := regexp.Match(pat, outContents)
if err != nil {
return false, fmt.Errorf("unable to check command %q for flag %q in help output: %w", ps.Path, flag, err)
}
return matched, nil
}
// Start starts the apiserver, waits for it to come up, and returns an error,
// if occurred.
func (ps *State) Start(ctx context.Context, stdout io.Writer, stderr io.Writer) (err error) {
if ps.ready {
return nil
}
ps.Cmd = exec.CommandContext(ctx, ps.Path, ps.Args...) //nolint:gosec
ps.Cmd.Env = append(ps.Cmd.Environ(), ps.Env...)
ps.Cmd.Stdout = stdout
ps.Cmd.Stderr = stderr
ps.Cmd.Dir = ps.Dir
ps.Cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
ready := make(chan bool)
timedOut := time.After(ps.StartTimeout)
pollerStopCh := make(stopChannel)
if ps.HealthCheck != nil {
go pollURLUntilOK(ps.HealthCheck.URL, ps.HealthCheck.PollInterval, ready, pollerStopCh)
} else {
// Assume that if we're not health-checking, we're ready to go.
close(ready)
}
ps.waitDone = make(chan struct{})
if err := ps.Cmd.Start(); err != nil {
ps.errMu.Lock()
defer ps.errMu.Unlock()
ps.exited = true
return err
}
go func() {
defer close(ps.waitDone)
err := ps.Cmd.Wait()
ps.errMu.Lock()
defer ps.errMu.Unlock()
ps.exitErr = err
ps.exited = true
}()
select {
case <-ready:
ps.ready = true
return nil
case <-ps.waitDone:
close(pollerStopCh)
return fmt.Errorf("timeout waiting for process %s to start successfully "+
"(it may have failed to start, or stopped unexpectedly before becoming ready)",
path.Base(ps.Path))
case <-timedOut:
close(pollerStopCh)
if ps.Cmd != nil {
// intentionally ignore this -- we might've crashed, failed to start, etc
ps.Cmd.Process.Signal(syscall.SIGTERM) //nolint:errcheck
}
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
}
}
// Exited returns true if the process exited, and may also
// return an error (as per Cmd.Wait) if the process did not
// exit with error code 0.
func (ps *State) Exited() (bool, error) {
ps.errMu.Lock()
defer ps.errMu.Unlock()
return ps.exited, ps.exitErr
}
func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
// there's probably certs *somewhere*,
// but it's fine to just skip validating
// them for health checks during testing
InsecureSkipVerify: true, //nolint:gosec
},
},
}
if interval <= 0 {
interval = 100 * time.Millisecond
}
for {
res, err := client.Get(url.String())
if err == nil {
res.Body.Close()
if res.StatusCode == http.StatusOK {
ready <- true
return
}
}
select {
case <-stopCh:
return
default:
time.Sleep(interval)
}
}
}
// Stop stops this process gracefully, waits for its termination.
func (ps *State) Stop() error {
if ps.Cmd == nil {
return nil
}
if done, err := ps.Exited(); done {
if err != nil {
logrus.Warnf("process %s exited with error: %v", path.Base(ps.Path), err)
}
return nil
}
if err := ps.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err)
}
timedOut := time.After(ps.StopTimeout)
select {
case <-ps.waitDone:
break
case <-timedOut:
if err := ps.Cmd.Process.Signal(syscall.SIGKILL); err != nil {
return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err)
}
return fmt.Errorf("timeout waiting for process %s to stop, sent SIGKILL", path.Base(ps.Path))
}
ps.ready = false
return nil
}

View File

@@ -0,0 +1,145 @@
package clusterapi
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/sirupsen/logrus"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
capav1beta1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta1"
capav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2"
capzv1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
clusterv1alpha3 "sigs.k8s.io/cluster-api/api/v1alpha3" //nolint:staticcheck
clusterv1alpha4 "sigs.k8s.io/cluster-api/api/v1alpha4" //nolint:staticcheck
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/openshift/installer/cmd/openshift-install/command"
)
var (
binDir string
)
func init() {
utilruntime.Must(clusterv1alpha3.AddToScheme(scheme.Scheme))
utilruntime.Must(clusterv1alpha4.AddToScheme(scheme.Scheme))
utilruntime.Must(clusterv1.AddToScheme(scheme.Scheme))
utilruntime.Must(capav1beta1.AddToScheme(scheme.Scheme))
utilruntime.Must(capav1.AddToScheme(scheme.Scheme))
utilruntime.Must(capzv1.AddToScheme(scheme.Scheme))
}
// localControlPlane creates a local capi control plane
// to use as a management cluster.
type localControlPlane struct {
Env *envtest.Environment
Client client.Client
Cfg *rest.Config
BinDir string
KubeconfigPath string
}
// Run launches the local control plane.
func (c *localControlPlane) Run(ctx context.Context) error {
// Create a temporary directory to unpack the cluster-api binaries.
c.BinDir = filepath.Join(command.RootOpts.Dir, "bin", "cluster-api")
if err := UnpackClusterAPIBinary(c.BinDir); err != nil {
return err
}
if err := UnpackEnvtestBinaries(c.BinDir); err != nil {
return err
}
log.SetLogger(klog.NewKlogr())
logrus.Info("Started local control plane with envtest")
c.Env = &envtest.Environment{
Scheme: scheme.Scheme,
AttachControlPlaneOutput: false,
BinaryAssetsDirectory: c.BinDir,
ControlPlaneStartTimeout: 10 * time.Second,
ControlPlaneStopTimeout: 10 * time.Second,
}
var err error
c.Cfg, err = c.Env.Start()
if err != nil {
return err
}
kc := fromEnvTestConfig(c.Cfg)
{
dir := filepath.Join(command.RootOpts.Dir, "auth")
kf, err := os.Create(filepath.Join(dir, "envtest.kubeconfig"))
if err != nil {
return err
}
if _, err := kf.Write(kc); err != nil {
return err
}
if err := kf.Close(); err != nil {
return err
}
c.KubeconfigPath, err = filepath.Abs(kf.Name())
if err != nil {
return err
}
}
// Create a new client to interact with the cluster.
cl, err := client.New(c.Cfg, client.Options{
Scheme: c.Env.Scheme,
})
if err != nil {
return err
}
c.Client = cl
logrus.Infof("Stored kubeconfig for envtest in: %v", c.KubeconfigPath)
return nil
}
func (c *localControlPlane) Stop() error {
return c.Env.Stop()
}
// fromEnvTestConfig returns a new Kubeconfig in byte form when running in envtest.
func fromEnvTestConfig(cfg *rest.Config) []byte {
clusterName := "envtest"
contextName := fmt.Sprintf("%s@%s", cfg.Username, clusterName)
c := api.Config{
Clusters: map[string]*api.Cluster{
clusterName: {
Server: cfg.Host,
CertificateAuthorityData: cfg.CAData,
},
},
Contexts: map[string]*api.Context{
contextName: {
Cluster: clusterName,
AuthInfo: cfg.Username,
},
},
AuthInfos: map[string]*api.AuthInfo{
cfg.Username: {
ClientKeyData: cfg.KeyData,
ClientCertificateData: cfg.CertData,
},
},
CurrentContext: contextName,
}
data, err := clientcmd.Write(c)
if err != nil {
logrus.Fatalf("failed to write kubeconfig: %v", err)
}
return data
}

View File

@@ -0,0 +1,4 @@
Mirror of the cluster-api binaries to embed in the installer.
The mirror is populated as part of the build process so that only the binaries for the target architecture are embedded.
(note that this file is needed to appease govet so that the directory is not empty)

162
pkg/clusterapi/providers.go Normal file
View File

@@ -0,0 +1,162 @@
package clusterapi
import (
"archive/zip"
"bytes"
"embed"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
zipFile = "cluster-api.zip"
)
var (
// ClusterAPI is the core provider for cluster-api.
ClusterAPI = Provider{
Name: "cluster-api",
Sources: sets.New("cluster-api"),
}
// EnvTest is the provider for the local control plane.
EnvTest = Provider{
Name: "envtest",
Sources: sets.New("kube-apiserver", "etcd"),
}
// AWS is the provider for creating resources in AWS.
AWS = infrastructureProvider("aws")
// Azure is the provider for creating resources in Azure.
Azure = infrastructureProvider("azure")
// AzureASO is a companion component to Azure that is used to create resources declaratively.
AzureASO = infrastructureProvider("azureaso")
// IBMCloud is the provider for creating resources in IBM Cloud and powervs.
IBMCloud = infrastructureProvider("ibmcloud")
// GCP is the provider for creating resources in GCP.
GCP = infrastructureProvider("gcp")
// Nutanix is the provider for creating resources in Nutanix.
Nutanix = infrastructureProvider("nutanix")
// VSphere is the provider for creating resources in vSphere.
VSphere = infrastructureProvider("vsphere")
)
// Provider is a Cluster API provider.
type Provider struct {
// Name of the provider.
Name string
// Sources of the provider.
Sources sets.Set[string]
}
// infrastructureProvider configures a infrastructureProvider built locally.
func infrastructureProvider(name string) Provider {
return Provider{
Name: name,
Sources: sets.New(
fmt.Sprintf("cluster-api-provider-%s", name),
),
}
}
// Mirror is the embedded data for the providers.
//
//go:embed mirror/*
var Mirror embed.FS
// Extract extracts the provider from the embedded data into the specified directory.
func (p Provider) Extract(dir string) error {
f, err := Mirror.Open(filepath.Join("mirror", zipFile))
if err != nil {
return errors.Wrap(err, "failed to open cluster api zip from mirror")
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
return errors.Wrap(err, "failed to stat cluster api zip")
}
seek, ok := f.(io.ReaderAt)
if !ok {
// If the file does not support ReaderAt interface (<Go1.20)
// we need to read the whole file into memory.
b, err := io.ReadAll(f)
if err != nil {
return errors.Wrap(err, "failed to read cluster api zip")
}
seek = bytes.NewReader(b)
}
// Open a zip archive for reading.
r, err := zip.NewReader(seek, stat.Size())
if err != nil {
return errors.Wrap(err, "failed to open cluster api zip")
}
// Ensure the directory exists.
logrus.Debugf("creating %s directory", dir)
if err := os.MkdirAll(dir, 0o777); err != nil {
return errors.Wrapf(err, "could not make directory for the %s provider", p.Name)
}
// Extract the files.
for _, f := range r.File {
name := f.Name
if !p.Sources.Has(name) {
continue
}
path, err := sanitizeArchivePath(dir, name)
if err != nil {
return errors.Wrapf(err, "failed to sanitize archive file %q", name)
}
logrus.Debugf("extracting %s file", path)
if err := unpackFile(f, path); err != nil {
return errors.Wrapf(err, "failed to extract %q", path)
}
}
return nil
}
func unpackFile(f *zip.File, destPath string) error {
src, err := f.Open()
if err != nil {
return errors.Wrapf(err, "failed to open file %s", f.Name)
}
defer src.Close()
destFile, err := os.OpenFile(destPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o777)
if err != nil {
return err
}
defer destFile.Close()
if _, err := io.CopyN(destFile, src, f.FileInfo().Size()); err != nil {
return err
}
return nil
}
func sanitizeArchivePath(d, t string) (v string, err error) {
v = filepath.Join(d, t)
if strings.HasPrefix(v, filepath.Clean(d)) {
return v, nil
}
return "", fmt.Errorf("%s: %s", "content filepath is tainted", t)
}
// UnpackClusterAPIBinary unpacks the cluster-api binary from the embedded data so that it can be run to create the
// infrastructure for the cluster.
func UnpackClusterAPIBinary(dir string) error {
return ClusterAPI.Extract(dir)
}
// UnpackEnvtestBinaries unpacks the envtest binaries from the embedded data so that it can be run to create the
// infrastructure for the cluster.
func UnpackEnvtestBinaries(dir string) error {
return EnvTest.Extract(dir)
}

367
pkg/clusterapi/system.go Normal file
View File

@@ -0,0 +1,367 @@
package clusterapi
import (
"bytes"
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"text/template"
"time"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"github.com/openshift/installer/data"
"github.com/openshift/installer/pkg/asset/installconfig"
"github.com/openshift/installer/pkg/clusterapi/internal/process"
"github.com/openshift/installer/pkg/clusterapi/internal/process/addr"
"github.com/openshift/installer/pkg/types/aws"
"github.com/openshift/installer/pkg/types/azure"
"github.com/openshift/installer/pkg/types/gcp"
"github.com/openshift/installer/pkg/types/ibmcloud"
"github.com/openshift/installer/pkg/types/nutanix"
"github.com/openshift/installer/pkg/types/vsphere"
)
// System creates a local capi control plane
// to use as a management cluster.
type System struct {
Client client.Client
componentDir string
lcp *localControlPlane
wg sync.WaitGroup
once sync.Once
cancel context.CancelFunc
}
// Run launches the cluster-api system.
func (c *System) Run(ctx context.Context, installConfig *installconfig.InstallConfig) (err error) {
// Setup the context with a cancel function.
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
// Create the local control plane.
c.lcp = &localControlPlane{}
if err := c.lcp.Run(ctx); err != nil {
return fmt.Errorf("failed to run local control plane: %w", err)
}
c.Client = c.lcp.Client
// Create a temporary directory to unpack the cluster-api assets
// and use it as the working directory for the envtest environment.
componentDir, err := os.MkdirTemp("", "openshift-cluster-api-system-components")
if err != nil {
return err
}
if err := data.Unpack(componentDir, "/cluster-api"); err != nil {
return err
}
c.componentDir = componentDir
// Create the controllers, we always need to run the cluster-api core controller.
controllers := []*controller{
{
Name: "Cluster API",
Path: fmt.Sprintf("%s/cluster-api", binDir),
Components: []string{c.componentDir + "/core-components.yaml"},
Args: []string{
"-v=2",
"--metrics-bind-addr=0",
"--health-addr={{suggestHealthHostPort}}",
"--webhook-port={{.WebhookPort}}",
"--webhook-cert-dir={{.WebhookCertDir}}",
},
},
}
// Create the infrastructure controllers.
// Only add the controllers for the platform we are deploying to.
switch platform := installConfig.Config.Platform.Name(); platform {
case aws.Name:
controllers = append(controllers,
c.getInfrastructureController(
&AWS,
[]string{
"-v=2",
"--metrics-bind-addr=0",
"--health-addr={{suggestHealthHostPort}}",
"--webhook-port={{.WebhookPort}}",
"--webhook-cert-dir={{.WebhookCertDir}}",
"--feature-gates=BootstrapFormatIgnition=true,ExternalResourceGC=true",
},
map[string]string{},
),
)
case azure.Name:
session, err := installConfig.Azure.Session()
if err != nil {
return fmt.Errorf("failed to create azure session: %w", err)
}
controllers = append(controllers,
c.getInfrastructureController(
&Azure,
[]string{
"-v=2",
"--metrics-bind-addr=0",
"--health-addr={{suggestHealthHostPort}}",
"--webhook-port={{.WebhookPort}}",
"--webhook-cert-dir={{.WebhookCertDir}}",
},
map[string]string{},
),
c.getInfrastructureController(
&AzureASO,
[]string{
"--v=0",
"--metrics-addr=0",
"--health-addr={{suggestHealthHostPort}}",
"--webhook-port={{.WebhookPort}}",
"--webhook-cert-dir={{.WebhookCertDir}}",
"--crd-pattern=",
"--enable-crd-management=false",
}, map[string]string{
"POD_NAMESPACE": "capz-system",
"AZURE_CLIENT_ID": session.Credentials.ClientID,
"AZURE_CLIENT_SECRET": session.Credentials.ClientSecret,
"AZURE_CLIENT_CERTIFICATE": session.Credentials.ClientCertificatePath,
"AZURE_CLIENT_CERTIFICATE_PASSWORD": session.Credentials.ClientCertificatePassword,
"AZURE_TENANT_ID": session.Credentials.TenantID,
"AZURE_SUBSCRIPTION_ID": session.Credentials.SubscriptionID,
},
),
)
case gcp.Name:
// TODO
case ibmcloud.Name:
// TODO
case nutanix.Name:
// TODO
case vsphere.Name:
// TODO
default:
return fmt.Errorf("unsupported platform %q", platform)
}
// Run the controllers.
for _, ct := range controllers {
if err := c.runController(ctx, ct); err != nil {
return fmt.Errorf("failed to run controller %q: %w", ct.Name, err)
}
}
// We create a wait group to wait for the controllers to stop,
// this waitgroup is a global, and is used by the Teardown function
// which is expected to be called when the program exits.
c.wg.Add(1)
go func() {
defer c.wg.Done()
// Stop the controllers when the context is cancelled.
<-ctx.Done()
for _, ct := range controllers {
if ct.state != nil {
if err := ct.state.Stop(); err != nil {
logrus.Warnf("Failed to stop controller: %s: %v", ct.Name, err)
continue
}
logrus.Infof("Stopped controller: %s", ct.Name)
}
}
// Stop the local control plane.
if err := c.lcp.Stop(); err != nil {
logrus.Warnf("Failed to stop local Cluster API control plane: %v", err)
}
}()
return nil
}
// Teardown shuts down the local capi control plane and all its controllers.
func (c *System) Teardown() {
if c.lcp == nil {
return
}
// Clean up the binary directory.
defer os.RemoveAll(c.lcp.BinDir)
// Proceed to shutdown.
c.once.Do(func() {
c.cancel()
logrus.Info("Shutting down local Cluster API control plane...")
ch := make(chan struct{})
go func() {
c.wg.Wait()
close(ch)
}()
select {
case <-ch:
logrus.Info("Local Cluster API system has completed operations")
case <-time.After(60 * time.Second):
logrus.Warn("Timed out waiting for local Cluster API system to shut down")
}
})
}
// getInfrastructureController returns a controller for the given provider,
// most of the configuration is by convention.
//
// The provider is expected to be compiled as part of the release process, and packaged in the binaries directory
// and have the name `cluster-api-provider-<name>`.
//
// While the manifests can be optional, we expect them to be in the manifests directory and named `<name>-infrastructure-components.yaml`.
func (c *System) getInfrastructureController(provider *Provider, args []string, env map[string]string) *controller {
manifests := []string{}
defaultManifestPath := filepath.Join(c.componentDir, fmt.Sprintf("/%s-infrastructure-components.yaml", provider.Name))
if _, err := os.Stat(defaultManifestPath); err == nil {
manifests = append(manifests, defaultManifestPath)
}
return &controller{
Provider: provider,
Name: fmt.Sprintf("%s infrastructure provider", provider.Name),
Path: fmt.Sprintf("%s/cluster-api-provider-%s", binDir, provider.Name),
Components: manifests,
Args: args,
Env: env,
}
}
// controller encapsulates the state of a controller, its process state, and its configuration.
type controller struct {
Provider *Provider
state *process.State
Name string
Dir string
Path string
Components []string
Args []string
Env map[string]string
}
// runController configures the controller, and waits for it to be ready.
func (c *System) runController(ctx context.Context, ct *controller) error {
// If the provider is not empty, we extract it to the binaries directory.
if ct.Provider != nil {
if err := ct.Provider.Extract(binDir); err != nil {
logrus.Fatal(err)
}
}
// Create the WebhookInstallOptions from envtest, and pass the manifests we've been given as input.
// Once built, we install them in the local control plane using the rest.Config available.
// Envtest takes care of a few things needed to run webhooks locally:
// - Creates a self-signed certificate for the webhook server.
// - Tries to allocate a host:port for the webhook server to listen on.
// - Modifies the webhook manifests to point to the local webhook server through a URL and a CABundle.
wh := envtest.WebhookInstallOptions{
Paths: ct.Components,
IgnoreSchemeConvertible: true,
}
if err := wh.Install(c.lcp.Cfg); err != nil {
return fmt.Errorf("failed to prepare controller %q webhook options: %w", ct.Name, err)
}
// Most providers allocate a host:port configuration for the health check,
// which responds to a simple http request on /healthz and /readyz.
// When an argument is configured to use the suggestHealthHostPort function,
// we record the value, so we can pass it to
var healthCheckHostPort string
// Build the arguments, using go templating to render the values.
{
funcs := template.FuncMap{
"suggestHealthHostPort": func() (string, error) {
healthPort, healthHost, err := addr.Suggest("")
if err != nil {
return "", fmt.Errorf("unable to grab random port: %w", err)
}
healthCheckHostPort = fmt.Sprintf("%s:%d", healthHost, healthPort)
return healthCheckHostPort, nil
},
}
templateData := map[string]string{
"WebhookPort": fmt.Sprintf("%d", wh.LocalServingPort),
"WebhookCertDir": wh.LocalServingCertDir,
}
args := make([]string, 0, len(ct.Args))
for _, arg := range ct.Args {
final := new(bytes.Buffer)
tmpl := template.Must(template.New("arg").Funcs(funcs).Parse(arg))
if err := tmpl.Execute(final, templateData); err != nil {
return fmt.Errorf("failed to render controller %q arg %q: %w", ct.Name, arg, err)
}
args = append(args, strings.TrimSpace(final.String()))
}
ct.Args = args
}
// Build the environment variables.
env := []string{}
{
if ct.Env == nil {
ct.Env = map[string]string{}
}
// Override KUBECONFIG to point to the local control plane.
ct.Env["KUBECONFIG"] = c.lcp.KubeconfigPath
for key, value := range ct.Env {
env = append(env, fmt.Sprintf("%s=%s", key, value))
}
}
// Install the manifests for the controller, if any.
if len(ct.Components) > 0 {
opts := envtest.CRDInstallOptions{
Scheme: c.lcp.Env.Scheme,
Paths: ct.Components,
WebhookOptions: wh,
}
if _, err := envtest.InstallCRDs(c.lcp.Cfg, opts); err != nil {
return fmt.Errorf("failed to install controller %q manifests in local control plane: %w", ct.Name, err)
}
}
// Create the process state.
pr := &process.State{
Path: ct.Path,
Args: ct.Args,
Dir: ct.Dir,
Env: env,
StartTimeout: 60 * time.Second,
StopTimeout: 10 * time.Second,
}
// If the controller has a health check, we configure it, and wait for it to be ready.
if healthCheckHostPort != "" {
pr.HealthCheck = &process.HealthCheck{
URL: url.URL{
Scheme: "http",
Host: healthCheckHostPort,
Path: "/healthz",
},
}
}
// Initialize the process state.
if err := pr.Init(ct.Name); err != nil {
return fmt.Errorf("failed to initialize process state for controller %q: %w", ct.Name, err)
}
// Run the controller and store its state.
logrus.Infof("Running process: %s with args %v and env %v", ct.Name, ct.Args, env)
if err := pr.Start(ctx, os.Stdout, os.Stderr); err != nil {
return fmt.Errorf("failed to start controller %q: %w", ct.Name, err)
}
ct.state = pr
return nil
}

View File

@@ -18,7 +18,7 @@ func getDescStruct(opts MetricOpts, labelKeyValues map[string]string) string {
}
sort.Strings(lpStrings)
return fmt.Sprintf("Desc{fqName: %q, help: %q, constLabels: {%s}, variableLabels: %v}",
opts.Name, opts.Desc, strings.Join(lpStrings, ","), []string{})
opts.Name, opts.Desc, strings.Join(lpStrings, ","), "{}")
}
func getCollectorDescription(collector prometheus.Collector) string {