diff --git a/docs/dev/cluster-api.md b/docs/dev/cluster-api.md new file mode 100644 index 0000000000..452b12a0ea --- /dev/null +++ b/docs/dev/cluster-api.md @@ -0,0 +1,35 @@ +# Cluster API + +The installer uses Cluster API controllers through a local control plane powered by `kube-apiserver` and `etcd` running locally. + +### Local control plane + +The local control plane is setup using the previously available work done in Controller Runtime through [envtest](https://github.com/kubernetes-sigs/controller-runtime/tree/main/tools/setup-envtest). Envtest was born due to a necessity to run integration tests for controllers against a real API server, register webhooks (conversion, admission, validation), and managing the lifecycle of Custom Resource Definitions. + +Over time, `envtest` matured in a way that now can be used to run controllers in a local environment, reducing or eliminating the need for a full Kubernetes cluster to run controllers. + +At a high level, the local control plane is responsible for: +- Setting up certificates for the apiserver and etcd. +- Running (and cleaning up, on shutdown) the local control plane components. +- Installing any required component, like Custom Resource Definitions (CRDs) + - For Cluster API core the CRDs are stored in `data/data/cluster-api/core-components.yaml`. + - Infrastructure providers are expected to store their components in `data/data/cluster-api/-infrastructure-components.yaml` +- Upon install, the local control plane takes care of modifying any webhook (conversion, admission, validation) to point to the `host:post` combination assigned. + - Each controller manager will have its own `host:port` combination assigned. + - Certificates are generated and injected in the server, and the client certs in the api-server webhook configuration. +- For each process that the local control plane manages, a health check (ping to `/healthz`) is required to pass similarly how, when running in a Deployment, a health probe is configured. + +### Build and packaging + +The Cluster API system is formed of a set of binaries. The core Cluster API manager, and the infrastructure providers are built using Go Modules in the `cluster-api` folder. + +The binaries are built and packaged during the standard installer build process, `hack/build.sh`. Cluster API specific build flow is contained in the `hack/build-cluster-api.sh` script: +- Only enabled if the `OPENSHIFT_INSTALL_CLUSTER_API` environment variable is set. +- Builds (as needed) every binary listed as a Go Module in the `cluster-api` folder. +- Downloads (as needed) the specified version of `envtest` to pacakge `kube-apiserver` and `etcd`. +- Produces a single `cluster-api.zip` file which is then copied in `pkg/clusterapi/mirror`. + +To build an `openshift-install` binary with Cluster API bundled: +- Set `export OPENSHIFT_INSTALL_CLUSTER_API=y` + - Optionally `export SKIP_TERRAFORM=y` if you don't need to use Terraform. +- Run `./hack/build.sh`, the binary is then produced in `bin/openshift-install`. diff --git a/hack/build-cluster-api.sh b/hack/build-cluster-api.sh index a8e3954654..dc6dfb36fc 100644 --- a/hack/build-cluster-api.sh +++ b/hack/build-cluster-api.sh @@ -9,7 +9,7 @@ fi TARGET_OS_ARCH=$(go env GOOS)_$(go env GOARCH) CLUSTER_API_BIN_DIR="${PWD}/cluster-api/bin/${TARGET_OS_ARCH}" -CLUSTER_API_MIRROR_DIR="${PWD}/pkg/cluster-api/mirror/" +CLUSTER_API_MIRROR_DIR="${PWD}/pkg/clusterapi/mirror/" ENVTEST_K8S_VERSION="1.28.0" ENVTEST_ARCH=$(go env GOOS)-$(go env GOARCH) diff --git a/pkg/cluster-api/.gitignore b/pkg/clusterapi/.gitignore similarity index 100% rename from pkg/cluster-api/.gitignore rename to pkg/clusterapi/.gitignore diff --git a/pkg/clusterapi/internal/process/addr/addr.go b/pkg/clusterapi/internal/process/addr/addr.go new file mode 100644 index 0000000000..471a638b7a --- /dev/null +++ b/pkg/clusterapi/internal/process/addr/addr.go @@ -0,0 +1,124 @@ +package addr + +import ( + "errors" + "fmt" + "io/fs" + "net" + "os" + "path/filepath" + "strings" + "time" + + "github.com/openshift/installer/pkg/clusterapi/internal/process/flock" +) + +const ( + portReserveTime = 2 * time.Minute + portConflictRetry = 100 + portFilePrefix = "port-" +) + +var ( + cacheDir string +) + +func init() { + baseDir, err := os.UserCacheDir() + if err == nil { + cacheDir = filepath.Join(baseDir, "kubebuilder-envtest") + err = os.MkdirAll(cacheDir, 0o750) + } + if err != nil { + // Either we didn't get a cache directory, or we can't use it + baseDir = os.TempDir() + cacheDir = filepath.Join(baseDir, "kubebuilder-envtest") + err = os.MkdirAll(cacheDir, 0o750) + } + if err != nil { + panic(err) + } +} + +type portCache struct{} + +func (c *portCache) add(port int) (bool, error) { + // Remove outdated ports. + if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) { + return nil + } + info, err := d.Info() + if err != nil { + // No-op if file no longer exists; may have been deleted by another + // process/thread trying to allocate ports. + if errors.Is(err, fs.ErrNotExist) { + return nil + } + return err + } + if time.Since(info.ModTime()) > portReserveTime { + if err := os.Remove(filepath.Join(cacheDir, path)); err != nil { + // No-op if file no longer exists; may have been deleted by another + // process/thread trying to allocate ports. + if os.IsNotExist(err) { + return nil + } + return err + } + } + return nil + }); err != nil { + return false, err + } + // Try allocating new port, by acquiring a file. + path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port) + if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + +var cache = &portCache{} + +func suggest(listenHost string) (*net.TCPListener, int, string, error) { + if listenHost == "" { + listenHost = "localhost" + } + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0")) + if err != nil { + return nil, -1, "", err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, -1, "", err + } + return l, l.Addr().(*net.TCPAddr).Port, + addr.IP.String(), + nil +} + +// Suggest suggests an address a process can listen on. It returns +// a tuple consisting of a free port and the hostname resolved to its IP. +// It makes sure that new port allocated does not conflict with old ports +// allocated within 1 minute. +func Suggest(listenHost string) (int, string, error) { + for i := 0; i < portConflictRetry; i++ { + listener, port, resolvedHost, err := suggest(listenHost) + if err != nil { + return -1, "", err + } + defer listener.Close() + if ok, err := cache.add(port); ok { + return port, resolvedHost, nil + } else if err != nil { + return -1, "", err + } + } + return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry) +} diff --git a/pkg/clusterapi/internal/process/flock/doc.go b/pkg/clusterapi/internal/process/flock/doc.go new file mode 100644 index 0000000000..44597e7781 --- /dev/null +++ b/pkg/clusterapi/internal/process/flock/doc.go @@ -0,0 +1,5 @@ +// Package flock is copied from k8s.io/kubernetes/pkg/util/flock to avoid +// importing k8s.io/kubernetes as a dependency. +// +// Provides file locking functionalities on unix systems. +package flock diff --git a/pkg/clusterapi/internal/process/flock/errors.go b/pkg/clusterapi/internal/process/flock/errors.go new file mode 100644 index 0000000000..f840166eca --- /dev/null +++ b/pkg/clusterapi/internal/process/flock/errors.go @@ -0,0 +1,8 @@ +package flock + +import "errors" + +var ( + // ErrAlreadyLocked is returned when the file is already locked. + ErrAlreadyLocked = errors.New("the file is already locked") +) diff --git a/pkg/clusterapi/internal/process/flock/flock_other.go b/pkg/clusterapi/internal/process/flock/flock_other.go new file mode 100644 index 0000000000..126d13380f --- /dev/null +++ b/pkg/clusterapi/internal/process/flock/flock_other.go @@ -0,0 +1,9 @@ +//go:build !linux && !darwin && !freebsd && !openbsd && !netbsd && !dragonfly +// +build !linux,!darwin,!freebsd,!openbsd,!netbsd,!dragonfly + +package flock + +// Acquire is not implemented on non-unix systems. +func Acquire(path string) error { + return nil +} diff --git a/pkg/clusterapi/internal/process/flock/flock_unix.go b/pkg/clusterapi/internal/process/flock/flock_unix.go new file mode 100644 index 0000000000..9d3a58f676 --- /dev/null +++ b/pkg/clusterapi/internal/process/flock/flock_unix.go @@ -0,0 +1,32 @@ +//go:build linux || darwin || freebsd || openbsd || netbsd || dragonfly +// +build linux darwin freebsd openbsd netbsd dragonfly + +package flock + +import ( + "errors" + "fmt" + "os" + + "golang.org/x/sys/unix" +) + +// Acquire acquires a lock on a file for the duration of the process. This method +// is reentrant. +func Acquire(path string) error { + fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600) + if err != nil { + if errors.Is(err, os.ErrExist) { + return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked) + } + return err + } + + // We don't need to close the fd since we should hold + // it until the process exits. + err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX) + if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB. + return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked) + } + return err +} diff --git a/pkg/clusterapi/internal/process/process.go b/pkg/clusterapi/internal/process/process.go new file mode 100644 index 0000000000..29d4c37619 --- /dev/null +++ b/pkg/clusterapi/internal/process/process.go @@ -0,0 +1,256 @@ +package process + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os/exec" + "path" + "regexp" + "sync" + "syscall" + "time" + + "github.com/sirupsen/logrus" +) + +// ListenAddr represents some listening address and port. +type ListenAddr struct { + Address string + Port string +} + +// URL returns a URL for this address with the given scheme and subpath. +func (l *ListenAddr) URL(scheme string, path string) *url.URL { + return &url.URL{ + Scheme: scheme, + Host: l.HostPort(), + Path: path, + } +} + +// HostPort returns the joined host-port pair for this address. +func (l *ListenAddr) HostPort() string { + return net.JoinHostPort(l.Address, l.Port) +} + +// HealthCheck describes the information needed to health-check a process via +// some health-check URL. +type HealthCheck struct { + url.URL + + // HealthCheckPollInterval is the interval which will be used for polling the + // endpoint described by Host, Port, and Path. + // + // If left empty it will default to 100 Milliseconds. + PollInterval time.Duration +} + +// State define the state of the process. +type State struct { + Cmd *exec.Cmd + + // HealthCheck describes how to check if this process is up. If we get an http.StatusOK, + // we assume the process is ready to operate. + // + // For example, the /healthz endpoint of the k8s API server, or the /health endpoint of etcd. + HealthCheck *HealthCheck + + Dir string + Args []string + Env []string + + StopTimeout time.Duration + StartTimeout time.Duration + + Path string + + // ready holds whether the process is currently in ready state (hit the ready condition) or not. + // It will be set to true on a successful `Start()` and set to false on a successful `Stop()` + ready bool + + // waitDone is closed when our call to wait finishes up, and indicates that + // our process has terminated. + waitDone chan struct{} + errMu sync.Mutex + exitErr error + exited bool +} + +// Init sets up this process, configuring binary paths if missing, initializing +// temporary directories, etc. +// +// This defaults all defaultable fields. +func (ps *State) Init(name string) error { + if ps.Path == "" { + if name == "" { + return fmt.Errorf("must have at least one of name or path") + } + } + + if ps.StartTimeout == 0 { + ps.StartTimeout = 20 * time.Second + } + + if ps.StopTimeout == 0 { + ps.StopTimeout = 20 * time.Second + } + return nil +} + +type stopChannel chan struct{} + +// CheckFlag checks the help output of this command for the presence of the given flag, specified +// without the leading `--` (e.g. `CheckFlag("insecure-port")` checks for `--insecure-port`), +// returning true if the flag is present. +func (ps *State) CheckFlag(flag string) (bool, error) { + cmd := exec.Command(ps.Path, "--help") //nolint:gosec + outContents, err := cmd.CombinedOutput() + if err != nil { + return false, fmt.Errorf("unable to run command %q to check for flag %q: %w", ps.Path, flag, err) + } + pat := `(?m)^\s*--` + flag + `\b` // (m --> multi-line --> ^ matches start of line) + matched, err := regexp.Match(pat, outContents) + if err != nil { + return false, fmt.Errorf("unable to check command %q for flag %q in help output: %w", ps.Path, flag, err) + } + return matched, nil +} + +// Start starts the apiserver, waits for it to come up, and returns an error, +// if occurred. +func (ps *State) Start(ctx context.Context, stdout io.Writer, stderr io.Writer) (err error) { + if ps.ready { + return nil + } + + ps.Cmd = exec.CommandContext(ctx, ps.Path, ps.Args...) //nolint:gosec + ps.Cmd.Env = append(ps.Cmd.Environ(), ps.Env...) + ps.Cmd.Stdout = stdout + ps.Cmd.Stderr = stderr + ps.Cmd.Dir = ps.Dir + ps.Cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + ready := make(chan bool) + timedOut := time.After(ps.StartTimeout) + + pollerStopCh := make(stopChannel) + if ps.HealthCheck != nil { + go pollURLUntilOK(ps.HealthCheck.URL, ps.HealthCheck.PollInterval, ready, pollerStopCh) + } else { + // Assume that if we're not health-checking, we're ready to go. + close(ready) + } + + ps.waitDone = make(chan struct{}) + if err := ps.Cmd.Start(); err != nil { + ps.errMu.Lock() + defer ps.errMu.Unlock() + ps.exited = true + return err + } + go func() { + defer close(ps.waitDone) + err := ps.Cmd.Wait() + + ps.errMu.Lock() + defer ps.errMu.Unlock() + ps.exitErr = err + ps.exited = true + }() + + select { + case <-ready: + ps.ready = true + return nil + case <-ps.waitDone: + close(pollerStopCh) + return fmt.Errorf("timeout waiting for process %s to start successfully "+ + "(it may have failed to start, or stopped unexpectedly before becoming ready)", + path.Base(ps.Path)) + case <-timedOut: + close(pollerStopCh) + if ps.Cmd != nil { + // intentionally ignore this -- we might've crashed, failed to start, etc + ps.Cmd.Process.Signal(syscall.SIGTERM) //nolint:errcheck + } + return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path)) + } +} + +// Exited returns true if the process exited, and may also +// return an error (as per Cmd.Wait) if the process did not +// exit with error code 0. +func (ps *State) Exited() (bool, error) { + ps.errMu.Lock() + defer ps.errMu.Unlock() + return ps.exited, ps.exitErr +} + +func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) { + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + // there's probably certs *somewhere*, + // but it's fine to just skip validating + // them for health checks during testing + InsecureSkipVerify: true, //nolint:gosec + }, + }, + } + if interval <= 0 { + interval = 100 * time.Millisecond + } + for { + res, err := client.Get(url.String()) + if err == nil { + res.Body.Close() + if res.StatusCode == http.StatusOK { + ready <- true + return + } + } + + select { + case <-stopCh: + return + default: + time.Sleep(interval) + } + } +} + +// Stop stops this process gracefully, waits for its termination. +func (ps *State) Stop() error { + if ps.Cmd == nil { + return nil + } + if done, err := ps.Exited(); done { + if err != nil { + logrus.Warnf("process %s exited with error: %v", path.Base(ps.Path), err) + } + return nil + } + if err := ps.Cmd.Process.Signal(syscall.SIGTERM); err != nil { + return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err) + } + + timedOut := time.After(ps.StopTimeout) + select { + case <-ps.waitDone: + break + case <-timedOut: + if err := ps.Cmd.Process.Signal(syscall.SIGKILL); err != nil { + return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err) + } + return fmt.Errorf("timeout waiting for process %s to stop, sent SIGKILL", path.Base(ps.Path)) + } + ps.ready = false + return nil +} diff --git a/pkg/clusterapi/localcontrolplane.go b/pkg/clusterapi/localcontrolplane.go new file mode 100644 index 0000000000..0a5cac35ac --- /dev/null +++ b/pkg/clusterapi/localcontrolplane.go @@ -0,0 +1,145 @@ +package clusterapi + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/sirupsen/logrus" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/klog/v2" + capav1beta1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta1" + capav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2" + capzv1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1" + clusterv1alpha3 "sigs.k8s.io/cluster-api/api/v1alpha3" //nolint:staticcheck + clusterv1alpha4 "sigs.k8s.io/cluster-api/api/v1alpha4" //nolint:staticcheck + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/openshift/installer/cmd/openshift-install/command" +) + +var ( + binDir string +) + +func init() { + utilruntime.Must(clusterv1alpha3.AddToScheme(scheme.Scheme)) + utilruntime.Must(clusterv1alpha4.AddToScheme(scheme.Scheme)) + utilruntime.Must(clusterv1.AddToScheme(scheme.Scheme)) + utilruntime.Must(capav1beta1.AddToScheme(scheme.Scheme)) + utilruntime.Must(capav1.AddToScheme(scheme.Scheme)) + utilruntime.Must(capzv1.AddToScheme(scheme.Scheme)) +} + +// localControlPlane creates a local capi control plane +// to use as a management cluster. +type localControlPlane struct { + Env *envtest.Environment + Client client.Client + Cfg *rest.Config + BinDir string + KubeconfigPath string +} + +// Run launches the local control plane. +func (c *localControlPlane) Run(ctx context.Context) error { + // Create a temporary directory to unpack the cluster-api binaries. + c.BinDir = filepath.Join(command.RootOpts.Dir, "bin", "cluster-api") + if err := UnpackClusterAPIBinary(c.BinDir); err != nil { + return err + } + if err := UnpackEnvtestBinaries(c.BinDir); err != nil { + return err + } + + log.SetLogger(klog.NewKlogr()) + logrus.Info("Started local control plane with envtest") + c.Env = &envtest.Environment{ + Scheme: scheme.Scheme, + AttachControlPlaneOutput: false, + BinaryAssetsDirectory: c.BinDir, + ControlPlaneStartTimeout: 10 * time.Second, + ControlPlaneStopTimeout: 10 * time.Second, + } + var err error + c.Cfg, err = c.Env.Start() + if err != nil { + return err + } + + kc := fromEnvTestConfig(c.Cfg) + { + dir := filepath.Join(command.RootOpts.Dir, "auth") + kf, err := os.Create(filepath.Join(dir, "envtest.kubeconfig")) + if err != nil { + return err + } + if _, err := kf.Write(kc); err != nil { + return err + } + if err := kf.Close(); err != nil { + return err + } + c.KubeconfigPath, err = filepath.Abs(kf.Name()) + if err != nil { + return err + } + } + + // Create a new client to interact with the cluster. + cl, err := client.New(c.Cfg, client.Options{ + Scheme: c.Env.Scheme, + }) + if err != nil { + return err + } + c.Client = cl + + logrus.Infof("Stored kubeconfig for envtest in: %v", c.KubeconfigPath) + return nil +} + +func (c *localControlPlane) Stop() error { + return c.Env.Stop() +} + +// fromEnvTestConfig returns a new Kubeconfig in byte form when running in envtest. +func fromEnvTestConfig(cfg *rest.Config) []byte { + clusterName := "envtest" + contextName := fmt.Sprintf("%s@%s", cfg.Username, clusterName) + c := api.Config{ + Clusters: map[string]*api.Cluster{ + clusterName: { + Server: cfg.Host, + CertificateAuthorityData: cfg.CAData, + }, + }, + Contexts: map[string]*api.Context{ + contextName: { + Cluster: clusterName, + AuthInfo: cfg.Username, + }, + }, + AuthInfos: map[string]*api.AuthInfo{ + cfg.Username: { + ClientKeyData: cfg.KeyData, + ClientCertificateData: cfg.CertData, + }, + }, + CurrentContext: contextName, + } + data, err := clientcmd.Write(c) + if err != nil { + logrus.Fatalf("failed to write kubeconfig: %v", err) + } + return data +} diff --git a/pkg/clusterapi/mirror/README b/pkg/clusterapi/mirror/README new file mode 100644 index 0000000000..453641ff26 --- /dev/null +++ b/pkg/clusterapi/mirror/README @@ -0,0 +1,4 @@ +Mirror of the cluster-api binaries to embed in the installer. +The mirror is populated as part of the build process so that only the binaries for the target architecture are embedded. + +(note that this file is needed to appease govet so that the directory is not empty) diff --git a/pkg/clusterapi/providers.go b/pkg/clusterapi/providers.go new file mode 100644 index 0000000000..58838b7ce9 --- /dev/null +++ b/pkg/clusterapi/providers.go @@ -0,0 +1,162 @@ +package clusterapi + +import ( + "archive/zip" + "bytes" + "embed" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + zipFile = "cluster-api.zip" +) + +var ( + // ClusterAPI is the core provider for cluster-api. + ClusterAPI = Provider{ + Name: "cluster-api", + Sources: sets.New("cluster-api"), + } + + // EnvTest is the provider for the local control plane. + EnvTest = Provider{ + Name: "envtest", + Sources: sets.New("kube-apiserver", "etcd"), + } + + // AWS is the provider for creating resources in AWS. + AWS = infrastructureProvider("aws") + // Azure is the provider for creating resources in Azure. + Azure = infrastructureProvider("azure") + // AzureASO is a companion component to Azure that is used to create resources declaratively. + AzureASO = infrastructureProvider("azureaso") + // IBMCloud is the provider for creating resources in IBM Cloud and powervs. + IBMCloud = infrastructureProvider("ibmcloud") + // GCP is the provider for creating resources in GCP. + GCP = infrastructureProvider("gcp") + // Nutanix is the provider for creating resources in Nutanix. + Nutanix = infrastructureProvider("nutanix") + // VSphere is the provider for creating resources in vSphere. + VSphere = infrastructureProvider("vsphere") +) + +// Provider is a Cluster API provider. +type Provider struct { + // Name of the provider. + Name string + // Sources of the provider. + Sources sets.Set[string] +} + +// infrastructureProvider configures a infrastructureProvider built locally. +func infrastructureProvider(name string) Provider { + return Provider{ + Name: name, + Sources: sets.New( + fmt.Sprintf("cluster-api-provider-%s", name), + ), + } +} + +// Mirror is the embedded data for the providers. +// +//go:embed mirror/* +var Mirror embed.FS + +// Extract extracts the provider from the embedded data into the specified directory. +func (p Provider) Extract(dir string) error { + f, err := Mirror.Open(filepath.Join("mirror", zipFile)) + if err != nil { + return errors.Wrap(err, "failed to open cluster api zip from mirror") + } + defer f.Close() + stat, err := f.Stat() + if err != nil { + return errors.Wrap(err, "failed to stat cluster api zip") + } + seek, ok := f.(io.ReaderAt) + if !ok { + // If the file does not support ReaderAt interface (`. +// +// While the manifests can be optional, we expect them to be in the manifests directory and named `-infrastructure-components.yaml`. +func (c *System) getInfrastructureController(provider *Provider, args []string, env map[string]string) *controller { + manifests := []string{} + defaultManifestPath := filepath.Join(c.componentDir, fmt.Sprintf("/%s-infrastructure-components.yaml", provider.Name)) + if _, err := os.Stat(defaultManifestPath); err == nil { + manifests = append(manifests, defaultManifestPath) + } + return &controller{ + Provider: provider, + Name: fmt.Sprintf("%s infrastructure provider", provider.Name), + Path: fmt.Sprintf("%s/cluster-api-provider-%s", binDir, provider.Name), + Components: manifests, + Args: args, + Env: env, + } +} + +// controller encapsulates the state of a controller, its process state, and its configuration. +type controller struct { + Provider *Provider + state *process.State + + Name string + Dir string + Path string + Components []string + Args []string + Env map[string]string +} + +// runController configures the controller, and waits for it to be ready. +func (c *System) runController(ctx context.Context, ct *controller) error { + // If the provider is not empty, we extract it to the binaries directory. + if ct.Provider != nil { + if err := ct.Provider.Extract(binDir); err != nil { + logrus.Fatal(err) + } + } + + // Create the WebhookInstallOptions from envtest, and pass the manifests we've been given as input. + // Once built, we install them in the local control plane using the rest.Config available. + // Envtest takes care of a few things needed to run webhooks locally: + // - Creates a self-signed certificate for the webhook server. + // - Tries to allocate a host:port for the webhook server to listen on. + // - Modifies the webhook manifests to point to the local webhook server through a URL and a CABundle. + wh := envtest.WebhookInstallOptions{ + Paths: ct.Components, + IgnoreSchemeConvertible: true, + } + if err := wh.Install(c.lcp.Cfg); err != nil { + return fmt.Errorf("failed to prepare controller %q webhook options: %w", ct.Name, err) + } + + // Most providers allocate a host:port configuration for the health check, + // which responds to a simple http request on /healthz and /readyz. + // When an argument is configured to use the suggestHealthHostPort function, + // we record the value, so we can pass it to + var healthCheckHostPort string + + // Build the arguments, using go templating to render the values. + { + funcs := template.FuncMap{ + "suggestHealthHostPort": func() (string, error) { + healthPort, healthHost, err := addr.Suggest("") + if err != nil { + return "", fmt.Errorf("unable to grab random port: %w", err) + } + healthCheckHostPort = fmt.Sprintf("%s:%d", healthHost, healthPort) + return healthCheckHostPort, nil + }, + } + + templateData := map[string]string{ + "WebhookPort": fmt.Sprintf("%d", wh.LocalServingPort), + "WebhookCertDir": wh.LocalServingCertDir, + } + + args := make([]string, 0, len(ct.Args)) + for _, arg := range ct.Args { + final := new(bytes.Buffer) + tmpl := template.Must(template.New("arg").Funcs(funcs).Parse(arg)) + if err := tmpl.Execute(final, templateData); err != nil { + return fmt.Errorf("failed to render controller %q arg %q: %w", ct.Name, arg, err) + } + args = append(args, strings.TrimSpace(final.String())) + } + ct.Args = args + } + + // Build the environment variables. + env := []string{} + { + if ct.Env == nil { + ct.Env = map[string]string{} + } + // Override KUBECONFIG to point to the local control plane. + ct.Env["KUBECONFIG"] = c.lcp.KubeconfigPath + for key, value := range ct.Env { + env = append(env, fmt.Sprintf("%s=%s", key, value)) + } + } + + // Install the manifests for the controller, if any. + if len(ct.Components) > 0 { + opts := envtest.CRDInstallOptions{ + Scheme: c.lcp.Env.Scheme, + Paths: ct.Components, + WebhookOptions: wh, + } + if _, err := envtest.InstallCRDs(c.lcp.Cfg, opts); err != nil { + return fmt.Errorf("failed to install controller %q manifests in local control plane: %w", ct.Name, err) + } + } + + // Create the process state. + pr := &process.State{ + Path: ct.Path, + Args: ct.Args, + Dir: ct.Dir, + Env: env, + StartTimeout: 60 * time.Second, + StopTimeout: 10 * time.Second, + } + + // If the controller has a health check, we configure it, and wait for it to be ready. + if healthCheckHostPort != "" { + pr.HealthCheck = &process.HealthCheck{ + URL: url.URL{ + Scheme: "http", + Host: healthCheckHostPort, + Path: "/healthz", + }, + } + } + + // Initialize the process state. + if err := pr.Init(ct.Name); err != nil { + return fmt.Errorf("failed to initialize process state for controller %q: %w", ct.Name, err) + } + + // Run the controller and store its state. + logrus.Infof("Running process: %s with args %v and env %v", ct.Name, ct.Args, env) + if err := pr.Start(ctx, os.Stdout, os.Stderr); err != nil { + return fmt.Errorf("failed to start controller %q: %w", ct.Name, err) + } + ct.state = pr + return nil +} diff --git a/pkg/metrics/builder/builder_test.go b/pkg/metrics/builder/builder_test.go index 858fdf4e29..2ea68db589 100644 --- a/pkg/metrics/builder/builder_test.go +++ b/pkg/metrics/builder/builder_test.go @@ -18,7 +18,7 @@ func getDescStruct(opts MetricOpts, labelKeyValues map[string]string) string { } sort.Strings(lpStrings) return fmt.Sprintf("Desc{fqName: %q, help: %q, constLabels: {%s}, variableLabels: %v}", - opts.Name, opts.Desc, strings.Join(lpStrings, ","), []string{}) + opts.Name, opts.Desc, strings.Join(lpStrings, ","), "{}") } func getCollectorDescription(collector prometheus.Collector) string {