mirror of
https://github.com/openshift/image-registry.git
synced 2026-02-05 09:45:55 +01:00
Add imagestream.ImageStream interface
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
|
||||
registrymanifest "github.com/openshift/image-registry/pkg/dockerregistry/server/manifest"
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
quotautil "github.com/openshift/image-registry/pkg/origin-common/quota/util"
|
||||
util "github.com/openshift/image-registry/pkg/origin-common/util"
|
||||
@@ -74,10 +75,10 @@ func (is *imageStream) createImageStream(ctx context.Context) (*imageapiv1.Image
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
// getImage retrieves the Image with digest `dgst`. No authorization check is done.
|
||||
func (is *imageStream) getImage(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, error) {
|
||||
// GetImage retrieves the Image with digest `dgst`. No authorization check is done.
|
||||
func (is *imageStream) GetImage(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, error) {
|
||||
if image, exists := is.cachedImages[dgst]; exists {
|
||||
context.GetLogger(ctx).Infof("(*imageStream).getImage: returning cached copy of %s", image.Name)
|
||||
context.GetLogger(ctx).Infof("(*imageStream).GetImage: returning cached copy of %s", image.Name)
|
||||
return image, nil
|
||||
}
|
||||
|
||||
@@ -87,7 +88,7 @@ func (is *imageStream) getImage(ctx context.Context, dgst digest.Digest) (*image
|
||||
return nil, wrapKStatusErrorOnGetImage(is.name, dgst, err)
|
||||
}
|
||||
|
||||
context.GetLogger(ctx).Infof("(*imageStream).getImage: got image %s", image.Name)
|
||||
context.GetLogger(ctx).Infof("(*imageStream).GetImage: got image %s", image.Name)
|
||||
if err := util.ImageWithMetadata(image); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -95,7 +96,7 @@ func (is *imageStream) getImage(ctx context.Context, dgst digest.Digest) (*image
|
||||
return image, nil
|
||||
}
|
||||
|
||||
// getStoredImageOfImageStream retrieves the Image with digest `dgst` and
|
||||
// GetStoredImageOfImageStream retrieves the Image with digest `dgst` and
|
||||
// ensures that the image belongs to the image stream `is`. It uses two
|
||||
// queries to master API:
|
||||
//
|
||||
@@ -105,8 +106,8 @@ func (is *imageStream) getImage(ctx context.Context, dgst digest.Digest) (*image
|
||||
// This allows us to cache the image stream for later use.
|
||||
//
|
||||
// If you need the image object to be modified according to image stream tag,
|
||||
// please use getImageOfImageStream.
|
||||
func (is *imageStream) getStoredImageOfImageStream(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, *imageapiv1.TagEvent, *imageapiv1.ImageStream, error) {
|
||||
// please use GetImageOfImageStream.
|
||||
func (is *imageStream) GetStoredImageOfImageStream(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, *imageapiv1.TagEvent, *imageapiv1.ImageStream, error) {
|
||||
stream, err := is.imageStreamGetter.get()
|
||||
if err != nil {
|
||||
context.GetLogger(ctx).Errorf("failed to get ImageStream: %v", err)
|
||||
@@ -119,7 +120,7 @@ func (is *imageStream) getStoredImageOfImageStream(ctx context.Context, dgst dig
|
||||
return nil, nil, nil, wrapKStatusErrorOnGetImage(is.name, dgst, err)
|
||||
}
|
||||
|
||||
image, err := is.getImage(ctx, dgst)
|
||||
image, err := is.GetImage(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, nil, nil, wrapKStatusErrorOnGetImage(is.name, dgst, err)
|
||||
}
|
||||
@@ -127,7 +128,7 @@ func (is *imageStream) getStoredImageOfImageStream(ctx context.Context, dgst dig
|
||||
return image, tagEvent, stream, nil
|
||||
}
|
||||
|
||||
// getImageOfImageStream retrieves the Image with digest `dgst` for the image
|
||||
// GetImageOfImageStream retrieves the Image with digest `dgst` for the image
|
||||
// stream. The image's field DockerImageReference is modified on the fly to
|
||||
// pretend that we've got the image from the source from which the image was
|
||||
// tagged to match tag's DockerImageReference.
|
||||
@@ -135,8 +136,8 @@ func (is *imageStream) getStoredImageOfImageStream(ctx context.Context, dgst dig
|
||||
// NOTE: due to on the fly modification, the returned image object should
|
||||
// not be sent to the master API. If you need unmodified version of the
|
||||
// image object, please use getStoredImageOfImageStream.
|
||||
func (is *imageStream) getImageOfImageStream(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, *imageapiv1.ImageStream, error) {
|
||||
image, tagEvent, stream, err := is.getStoredImageOfImageStream(ctx, dgst)
|
||||
func (is *imageStream) GetImageOfImageStream(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, *imageapiv1.ImageStream, error) {
|
||||
image, tagEvent, stream, err := is.GetStoredImageOfImageStream(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -146,13 +147,13 @@ func (is *imageStream) getImageOfImageStream(ctx context.Context, dgst digest.Di
|
||||
return image, stream, nil
|
||||
}
|
||||
|
||||
// updateImage modifies the Image.
|
||||
func (is *imageStream) updateImage(image *imageapiv1.Image) (*imageapiv1.Image, error) {
|
||||
// UpdateImage modifies the Image.
|
||||
func (is *imageStream) UpdateImage(image *imageapiv1.Image) (*imageapiv1.Image, error) {
|
||||
return is.registryOSClient.Images().Update(image)
|
||||
}
|
||||
|
||||
// rememberLayersOfImage caches the layer digests of given image
|
||||
func (is *imageStream) rememberLayersOfImage(ctx context.Context, image *imageapiv1.Image, cacheName string) {
|
||||
// RememberLayersOfImage caches the layer digests of given image
|
||||
func (is *imageStream) RememberLayersOfImage(ctx context.Context, image *imageapiv1.Image, cacheName string) {
|
||||
if len(image.DockerImageLayers) > 0 {
|
||||
for _, layer := range image.DockerImageLayers {
|
||||
_ = is.cache.AddDigest(digest.Digest(layer.Name), cacheName)
|
||||
@@ -179,7 +180,7 @@ func (is *imageStream) rememberLayersOfImage(ctx context.Context, image *imageap
|
||||
}
|
||||
}
|
||||
|
||||
func (is *imageStream) getSecrets() ([]corev1.Secret, error) {
|
||||
func (is *imageStream) GetSecrets() ([]corev1.Secret, error) {
|
||||
secrets, err := is.registryOSClient.ImageStreamSecrets(is.namespace).Secrets(is.name, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting secrets for repository %s: %v", is.Reference(), err)
|
||||
@@ -187,9 +188,9 @@ func (is *imageStream) getSecrets() ([]corev1.Secret, error) {
|
||||
return secrets.Items, nil
|
||||
}
|
||||
|
||||
// tagIsInsecure returns true if the given image stream or its tag allow for
|
||||
// TagIsInsecure returns true if the given image stream or its tag allow for
|
||||
// insecure transport.
|
||||
func (is *imageStream) tagIsInsecure(tag string, dgst digest.Digest) (bool, error) {
|
||||
func (is *imageStream) TagIsInsecure(tag string, dgst digest.Digest) (bool, error) {
|
||||
stream, err := is.imageStreamGetter.get()
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -239,7 +240,7 @@ func (is *imageStream) localRegistry() (string, error) {
|
||||
return local.Registry, nil
|
||||
}
|
||||
|
||||
func (is *imageStream) IdentifyCandidateRepositories(primary bool) ([]string, map[string]ImagePullthroughSpec, error) {
|
||||
func (is *imageStream) IdentifyCandidateRepositories(primary bool) ([]string, map[string]imagestream.ImagePullthroughSpec, error) {
|
||||
stream, err := is.imageStreamGetter.get()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@@ -324,7 +325,7 @@ func (is *imageStream) Untag(ctx context.Context, tag string, pullthroughEnabled
|
||||
return err
|
||||
}
|
||||
|
||||
image, err := is.getImage(ctx, dgst)
|
||||
image, err := is.GetImage(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func (is *imageStream) HasBlob(ctx context.Context, dgst digest.Digest, requireM
|
||||
processedImages[tagEvent.Image] = struct{}{}
|
||||
|
||||
context.GetLogger(ctx).Debugf("getting image %s", tagEvent.Image)
|
||||
image, err := is.getImage(ctx, digest.Digest(tagEvent.Image))
|
||||
image, err := is.GetImage(ctx, digest.Digest(tagEvent.Image))
|
||||
if err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
context.GetLogger(ctx).Debugf("image %q not found", tagEvent.Image)
|
||||
@@ -102,7 +102,7 @@ func (is *imageStream) HasBlob(ctx context.Context, dgst digest.Digest, requireM
|
||||
tagName := event2Name[tagEvent]
|
||||
context.GetLogger(ctx).Debugf("blob found under istag %s:%s in image %s", is.Reference(), tagName, tagEvent.Image)
|
||||
// remember all the layers of matching image
|
||||
is.rememberLayersOfImage(ctx, image, repoCacheName)
|
||||
is.RememberLayersOfImage(ctx, image, repoCacheName)
|
||||
return logFound(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
imageapiv1 "github.com/openshift/api/image/v1"
|
||||
|
||||
registrymanifest "github.com/openshift/image-registry/pkg/dockerregistry/server/manifest"
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
)
|
||||
|
||||
@@ -39,7 +40,7 @@ type manifestService struct {
|
||||
blobStore distribution.BlobStore
|
||||
|
||||
serverAddr string
|
||||
imageStream *imageStream
|
||||
imageStream imagestream.ImageStream
|
||||
|
||||
// acceptSchema2 allows to refuse the manifest schema version 2
|
||||
acceptSchema2 bool
|
||||
@@ -49,7 +50,7 @@ type manifestService struct {
|
||||
func (m *manifestService) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
|
||||
context.GetLogger(ctx).Debugf("(*manifestService).Exists")
|
||||
|
||||
image, _, err := m.imageStream.getImageOfImageStream(ctx, dgst)
|
||||
image, _, err := m.imageStream.GetImageOfImageStream(ctx, dgst)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -60,7 +61,7 @@ func (m *manifestService) Exists(ctx context.Context, dgst digest.Digest) (bool,
|
||||
func (m *manifestService) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
|
||||
context.GetLogger(ctx).Debugf("(*manifestService).Get")
|
||||
|
||||
image, _, _, err := m.imageStream.getStoredImageOfImageStream(ctx, dgst)
|
||||
image, _, _, err := m.imageStream.GetStoredImageOfImageStream(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -76,7 +77,7 @@ func (m *manifestService) Get(ctx context.Context, dgst digest.Digest, options .
|
||||
|
||||
manifest, err := m.manifests.Get(ctx, dgst, options...)
|
||||
if err == nil {
|
||||
m.imageStream.rememberLayersOfImage(ctx, image, ref)
|
||||
m.imageStream.RememberLayersOfImage(ctx, image, ref)
|
||||
m.migrateManifest(ctx, image, dgst, manifest, true)
|
||||
return manifest, nil
|
||||
} else if _, ok := err.(distribution.ErrManifestUnknownRevision); !ok {
|
||||
@@ -86,7 +87,7 @@ func (m *manifestService) Get(ctx context.Context, dgst digest.Digest, options .
|
||||
|
||||
manifest, err = registrymanifest.NewFromImage(image)
|
||||
if err == nil {
|
||||
m.imageStream.rememberLayersOfImage(ctx, image, ref)
|
||||
m.imageStream.RememberLayersOfImage(ctx, image, ref)
|
||||
m.migrateManifest(ctx, image, dgst, manifest, false)
|
||||
return manifest, nil
|
||||
} else {
|
||||
@@ -228,7 +229,7 @@ func (m *manifestService) storeManifestLocally(ctx context.Context, image *image
|
||||
}
|
||||
image.Annotations[imageapi.ImageManifestBlobStoredAnnotation] = "true"
|
||||
|
||||
if _, err := m.imageStream.updateImage(image); err != nil {
|
||||
if _, err := m.imageStream.UpdateImage(image); err != nil {
|
||||
context.GetLogger(ctx).Errorf("error updating Image: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,24 +6,19 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// projectObjectListStore represents a cache of objects indexed by a project name.
|
||||
// Used to store a list of items per namespace.
|
||||
type projectObjectListStore interface {
|
||||
add(namespace string, obj runtime.Object) error
|
||||
get(namespace string) (obj runtime.Object, exists bool, err error)
|
||||
}
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
)
|
||||
|
||||
// projectObjectListCache implements projectObjectListStore.
|
||||
type projectObjectListCache struct {
|
||||
store cache.Store
|
||||
}
|
||||
|
||||
var _ projectObjectListStore = &projectObjectListCache{}
|
||||
var _ imagestream.ProjectObjectListStore = &projectObjectListCache{}
|
||||
|
||||
// newProjectObjectListCache creates a cache to hold object list objects that will expire with the given ttl.
|
||||
func newProjectObjectListCache(ttl time.Duration) projectObjectListStore {
|
||||
func newProjectObjectListCache(ttl time.Duration) imagestream.ProjectObjectListStore {
|
||||
return &projectObjectListCache{
|
||||
store: cache.NewTTLStore(metaProjectObjectListKeyFunc, ttl),
|
||||
}
|
||||
@@ -31,7 +26,7 @@ func newProjectObjectListCache(ttl time.Duration) projectObjectListStore {
|
||||
|
||||
// add stores given list object under the given namespace. Any prior object under this
|
||||
// key will be replaced.
|
||||
func (c *projectObjectListCache) add(namespace string, obj runtime.Object) error {
|
||||
func (c *projectObjectListCache) Add(namespace string, obj runtime.Object) error {
|
||||
if namespace == "" {
|
||||
return fmt.Errorf("namespace cannot be empty")
|
||||
}
|
||||
@@ -43,7 +38,7 @@ func (c *projectObjectListCache) add(namespace string, obj runtime.Object) error
|
||||
}
|
||||
|
||||
// get retrieves a cached list object if present and not expired.
|
||||
func (c *projectObjectListCache) get(namespace string) (runtime.Object, bool, error) {
|
||||
func (c *projectObjectListCache) Get(namespace string) (runtime.Object, bool, error) {
|
||||
entry, exists, err := c.store.GetByKey(namespace)
|
||||
if err != nil {
|
||||
return nil, exists, err
|
||||
|
||||
@@ -141,7 +141,7 @@ func TestPullthroughServeBlob(t *testing.T) {
|
||||
|
||||
remoteBlobGetter := NewBlobGetterService(
|
||||
imageStream,
|
||||
imageStream.getSecrets,
|
||||
imageStream.GetSecrets,
|
||||
imageStream.cache)
|
||||
|
||||
ptbs := &pullthroughBlobStore{
|
||||
@@ -562,7 +562,7 @@ func TestPullthroughServeBlobInsecure(t *testing.T) {
|
||||
|
||||
remoteBlobGetter := NewBlobGetterService(
|
||||
imageStream,
|
||||
imageStream.getSecrets,
|
||||
imageStream.GetSecrets,
|
||||
imageStream.cache)
|
||||
|
||||
ptbs := &pullthroughBlobStore{
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/digest"
|
||||
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
)
|
||||
|
||||
@@ -15,7 +16,7 @@ import (
|
||||
type pullthroughManifestService struct {
|
||||
distribution.ManifestService
|
||||
localManifestService distribution.ManifestService
|
||||
imageStream *imageStream
|
||||
imageStream imagestream.ImageStream
|
||||
mirror bool
|
||||
}
|
||||
|
||||
@@ -38,7 +39,7 @@ func (m *pullthroughManifestService) Get(ctx context.Context, dgst digest.Digest
|
||||
|
||||
func (m *pullthroughManifestService) remoteGet(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
|
||||
context.GetLogger(ctx).Debugf("(*pullthroughManifestService).remoteGet: starting with dgst=%s", dgst.String())
|
||||
image, _, err := m.imageStream.getImageOfImageStream(ctx, dgst)
|
||||
image, _, err := m.imageStream.GetImageOfImageStream(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -70,7 +71,7 @@ func (m *pullthroughManifestService) remoteGet(ctx context.Context, dgst digest.
|
||||
context.GetLogger(ctx).Errorf("failed to mirror manifest %s: %v", ref.Exact(), putErr)
|
||||
}
|
||||
}
|
||||
m.imageStream.rememberLayersOfImage(ctx, image, ref.Exact())
|
||||
m.imageStream.RememberLayersOfImage(ctx, image, ref.Exact())
|
||||
case distribution.ErrManifestUnknownRevision:
|
||||
break
|
||||
default:
|
||||
@@ -81,7 +82,7 @@ func (m *pullthroughManifestService) remoteGet(ctx context.Context, dgst digest.
|
||||
}
|
||||
|
||||
func (m *pullthroughManifestService) getRemoteRepositoryClient(ctx context.Context, ref *imageapi.DockerImageReference, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Repository, error) {
|
||||
retriever := getImportContext(ctx, m.imageStream.getSecrets)
|
||||
retriever := getImportContext(ctx, m.imageStream.GetSecrets)
|
||||
|
||||
// determine, whether to fall-back to insecure transport based on a specification of image's tag
|
||||
// if the client pulls by tag, use that
|
||||
@@ -93,7 +94,7 @@ func (m *pullthroughManifestService) getRemoteRepositoryClient(ctx context.Conte
|
||||
}
|
||||
}
|
||||
|
||||
insecure, err := m.imageStream.tagIsInsecure(tag, dgst)
|
||||
insecure, err := m.imageStream.TagIsInsecure(tag, dgst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/configuration"
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
)
|
||||
|
||||
@@ -55,7 +56,7 @@ type quotaEnforcingConfig struct {
|
||||
// if set, enables quota enforcement
|
||||
enforcementEnabled bool
|
||||
// if set, enables caching of quota objects per project
|
||||
limitRanges projectObjectListStore
|
||||
limitRanges imagestream.ProjectObjectListStore
|
||||
}
|
||||
|
||||
// quotaRestrictedBlobStore wraps upstream blob store with a guard preventing big layers exceeding image quotas
|
||||
@@ -120,10 +121,10 @@ func (bw *quotaRestrictedBlobWriter) Commit(ctx context.Context, provisional dis
|
||||
return bw.BlobWriter.Commit(ctx, provisional)
|
||||
}
|
||||
|
||||
// getLimitRangeList returns list of limit ranges for repo.
|
||||
func (is *imageStream) getLimitRangeList(ctx context.Context, cache projectObjectListStore) (*corev1.LimitRangeList, error) {
|
||||
// GetLimitRangeList returns list of limit ranges for repo.
|
||||
func (is *imageStream) GetLimitRangeList(ctx context.Context, cache imagestream.ProjectObjectListStore) (*corev1.LimitRangeList, error) {
|
||||
if cache != nil {
|
||||
obj, exists, _ := cache.get(is.namespace)
|
||||
obj, exists, _ := cache.Get(is.namespace)
|
||||
if exists {
|
||||
return obj.(*corev1.LimitRangeList), nil
|
||||
}
|
||||
@@ -138,7 +139,7 @@ func (is *imageStream) getLimitRangeList(ctx context.Context, cache projectObjec
|
||||
}
|
||||
|
||||
if cache != nil {
|
||||
err = cache.add(is.namespace, lrs)
|
||||
err = cache.Add(is.namespace, lrs)
|
||||
if err != nil {
|
||||
context.GetLogger(ctx).Errorf("failed to cache limit range list: %v", err)
|
||||
}
|
||||
@@ -154,7 +155,7 @@ func admitBlobWrite(ctx context.Context, repo *repository, size int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
lrs, err := repo.imageStream.getLimitRangeList(ctx, repo.app.quotaEnforcing.limitRanges)
|
||||
lrs, err := repo.imageStream.GetLimitRangeList(ctx, repo.app.quotaEnforcing.limitRanges)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
imageapiv1 "github.com/openshift/api/image/v1"
|
||||
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
"github.com/openshift/image-registry/pkg/origin-common/image/registryclient"
|
||||
)
|
||||
@@ -57,7 +58,7 @@ func (c *digestBlobStoreCache) Put(dgst digest.Digest, bs distribution.BlobStore
|
||||
// remoteBlobGetterService implements BlobGetterService and allows to serve blobs from remote
|
||||
// repositories.
|
||||
type remoteBlobGetterService struct {
|
||||
imageStream *imageStream
|
||||
imageStream imagestream.ImageStream
|
||||
getSecrets secretsGetter
|
||||
cache cache.RepositoryDigest
|
||||
digestToStore *digestBlobStoreCache
|
||||
@@ -68,7 +69,7 @@ var _ BlobGetterService = &remoteBlobGetterService{}
|
||||
// NewBlobGetterService returns a getter for remote blobs. Its cache will be shared among different middleware
|
||||
// wrappers, which is a must at least for stat calls made on manifest's dependencies during its verification.
|
||||
func NewBlobGetterService(
|
||||
imageStream *imageStream,
|
||||
imageStream imagestream.ImageStream,
|
||||
secretsGetter secretsGetter,
|
||||
cache cache.RepositoryDigest,
|
||||
) BlobGetterService {
|
||||
@@ -80,13 +81,6 @@ func NewBlobGetterService(
|
||||
}
|
||||
}
|
||||
|
||||
// ImagePullthroughSpec contains a reference of remote image to pull associated with an insecure flag for the
|
||||
// corresponding registry.
|
||||
type ImagePullthroughSpec struct {
|
||||
dockerImageReference *imageapi.DockerImageReference
|
||||
insecure bool
|
||||
}
|
||||
|
||||
// Stat provides metadata about a blob identified by the digest. If the
|
||||
// blob is unknown to the describer, ErrBlobUnknown will be returned.
|
||||
func (rbgs *remoteBlobGetterService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
@@ -176,16 +170,16 @@ func (rbgs *remoteBlobGetterService) ServeBlob(ctx context.Context, w http.Respo
|
||||
func (rbgs *remoteBlobGetterService) proxyStat(
|
||||
ctx context.Context,
|
||||
retriever registryclient.RepositoryRetriever,
|
||||
spec *ImagePullthroughSpec,
|
||||
spec *imagestream.ImagePullthroughSpec,
|
||||
dgst digest.Digest,
|
||||
) (distribution.Descriptor, error) {
|
||||
ref := spec.dockerImageReference
|
||||
ref := spec.DockerImageReference
|
||||
insecureNote := ""
|
||||
if spec.insecure {
|
||||
if spec.Insecure {
|
||||
insecureNote = " with a fall-back to insecure transport"
|
||||
}
|
||||
context.GetLogger(ctx).Infof("Trying to stat %q from %q%s", dgst, ref.AsRepository().Exact(), insecureNote)
|
||||
repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), spec.insecure)
|
||||
repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), spec.Insecure)
|
||||
if err != nil {
|
||||
context.GetLogger(ctx).Errorf("Error getting remote repository for image %q: %v", ref.AsRepository().Exact(), err)
|
||||
return distribution.Descriptor{}, err
|
||||
@@ -230,7 +224,7 @@ func (rbgs *remoteBlobGetterService) Get(ctx context.Context, dgst digest.Digest
|
||||
func (rbgs *remoteBlobGetterService) findCandidateRepository(
|
||||
ctx context.Context,
|
||||
repositoryCandidates []string,
|
||||
search map[string]ImagePullthroughSpec,
|
||||
search map[string]imagestream.ImagePullthroughSpec,
|
||||
cachedRepos []string,
|
||||
dgst digest.Digest,
|
||||
retriever registryclient.RepositoryRetriever,
|
||||
@@ -276,7 +270,7 @@ func (rbgs *remoteBlobGetterService) findCandidateRepository(
|
||||
|
||||
type byInsecureFlag struct {
|
||||
repositories []string
|
||||
specs []*ImagePullthroughSpec
|
||||
specs []*imagestream.ImagePullthroughSpec
|
||||
}
|
||||
|
||||
func (by *byInsecureFlag) Len() int {
|
||||
@@ -290,17 +284,17 @@ func (by *byInsecureFlag) Swap(i, j int) {
|
||||
by.specs[i], by.specs[j] = by.specs[j], by.specs[i]
|
||||
}
|
||||
func (by *byInsecureFlag) Less(i, j int) bool {
|
||||
if by.specs[i].insecure == by.specs[j].insecure {
|
||||
if by.specs[i].Insecure == by.specs[j].Insecure {
|
||||
switch {
|
||||
case by.repositories[i] < by.repositories[j]:
|
||||
return true
|
||||
case by.repositories[i] > by.repositories[j]:
|
||||
return false
|
||||
default:
|
||||
return by.specs[i].dockerImageReference.Exact() < by.specs[j].dockerImageReference.Exact()
|
||||
return by.specs[i].DockerImageReference.Exact() < by.specs[j].DockerImageReference.Exact()
|
||||
}
|
||||
}
|
||||
return !by.specs[i].insecure
|
||||
return !by.specs[i].Insecure
|
||||
}
|
||||
|
||||
// identifyCandidateRepositories returns a list of remote repository names sorted from the best candidate to
|
||||
@@ -310,7 +304,7 @@ func identifyCandidateRepositories(
|
||||
is *imageapiv1.ImageStream,
|
||||
localRegistry string,
|
||||
primary bool,
|
||||
) ([]string, map[string]ImagePullthroughSpec) {
|
||||
) ([]string, map[string]imagestream.ImagePullthroughSpec) {
|
||||
insecureByDefault := false
|
||||
if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok {
|
||||
insecureByDefault = insecure == "true"
|
||||
@@ -362,14 +356,14 @@ func identifyCandidateRepositories(
|
||||
}
|
||||
|
||||
repositories := make([]string, 0, len(search))
|
||||
results := make(map[string]ImagePullthroughSpec)
|
||||
specs := []*ImagePullthroughSpec{}
|
||||
results := make(map[string]imagestream.ImagePullthroughSpec)
|
||||
specs := []*imagestream.ImagePullthroughSpec{}
|
||||
for repo, ref := range search {
|
||||
repositories = append(repositories, repo)
|
||||
// accompany the reference with corresponding registry's insecure flag
|
||||
spec := ImagePullthroughSpec{
|
||||
dockerImageReference: ref,
|
||||
insecure: insecureRegistries[ref.Registry],
|
||||
spec := imagestream.ImagePullthroughSpec{
|
||||
DockerImageReference: ref,
|
||||
Insecure: insecureRegistries[ref.Registry],
|
||||
}
|
||||
results[repo] = spec
|
||||
specs = append(specs, &spec)
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
|
||||
imageapiv1 "github.com/openshift/api/image/v1"
|
||||
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
)
|
||||
|
||||
@@ -20,7 +22,7 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
localRegistry string
|
||||
primary bool
|
||||
expectedRepositories []string
|
||||
expectedSearch map[string]ImagePullthroughSpec
|
||||
expectedSearch map[string]imagestream.ImagePullthroughSpec
|
||||
}{
|
||||
{
|
||||
name: "empty image stream",
|
||||
@@ -43,7 +45,7 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
localRegistry: "localhost:5000",
|
||||
primary: true,
|
||||
expectedRepositories: []string{"docker.io/library/busybox"},
|
||||
expectedSearch: map[string]ImagePullthroughSpec{
|
||||
expectedSearch: map[string]imagestream.ImagePullthroughSpec{
|
||||
"docker.io/library/busybox": makeTestImagePullthroughSpec(t, "docker.io/library/busybox:latest", false),
|
||||
},
|
||||
},
|
||||
@@ -81,7 +83,7 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
localRegistry: "localhost:5000",
|
||||
primary: true,
|
||||
expectedRepositories: []string{"example.org/user/app", "registry.example.org/user/app"},
|
||||
expectedSearch: map[string]ImagePullthroughSpec{
|
||||
expectedSearch: map[string]imagestream.ImagePullthroughSpec{
|
||||
"example.org/user/app": makeTestImagePullthroughSpec(t, "example.org/user/app:tag", false),
|
||||
"registry.example.org/user/app": makeTestImagePullthroughSpec(t, "registry.example.org/user/app:latest", true),
|
||||
},
|
||||
@@ -120,7 +122,7 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
localRegistry: "localhost:5000",
|
||||
primary: false,
|
||||
expectedRepositories: []string{"example.org/app"},
|
||||
expectedSearch: map[string]ImagePullthroughSpec{
|
||||
expectedSearch: map[string]imagestream.ImagePullthroughSpec{
|
||||
"example.org/app": makeTestImagePullthroughSpec(t, "example.org/app:tag2", true),
|
||||
},
|
||||
},
|
||||
@@ -180,7 +182,7 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
localRegistry: "localhost:5000",
|
||||
primary: true,
|
||||
expectedRepositories: []string{"a.a/app", "other.b/bar", "a.b/app", "a.b/c", "a.b/c/foo"},
|
||||
expectedSearch: map[string]ImagePullthroughSpec{
|
||||
expectedSearch: map[string]imagestream.ImagePullthroughSpec{
|
||||
"a.a/app": makeTestImagePullthroughSpec(t, "a.a/app:latest", false),
|
||||
"other.b/bar": makeTestImagePullthroughSpec(t, "other.b/bar:latest", false),
|
||||
"a.b/app": makeTestImagePullthroughSpec(t, "a.b/app:latest", true),
|
||||
@@ -210,7 +212,7 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
localRegistry: "localhost:5000",
|
||||
primary: true,
|
||||
expectedRepositories: []string{"a.b.c/app", "a.b/app", "a.b/foo"},
|
||||
expectedSearch: map[string]ImagePullthroughSpec{
|
||||
expectedSearch: map[string]imagestream.ImagePullthroughSpec{
|
||||
"a.b.c/app": makeTestImagePullthroughSpec(t, "a.b.c/app:latest", false),
|
||||
"a.b/app": makeTestImagePullthroughSpec(t, "a.b/app:latest", true),
|
||||
"a.b/foo": makeTestImagePullthroughSpec(t, "a.b/foo:latest", true),
|
||||
@@ -240,10 +242,10 @@ func TestIdentifyCandidateRepositories(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestImagePullthroughSpec(t *testing.T, ref string, insecure bool) ImagePullthroughSpec {
|
||||
func makeTestImagePullthroughSpec(t *testing.T, ref string, insecure bool) imagestream.ImagePullthroughSpec {
|
||||
r, err := imageapi.ParseDockerImageReference(ref)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return ImagePullthroughSpec{dockerImageReference: &r, insecure: insecure}
|
||||
return imagestream.ImagePullthroughSpec{DockerImageReference: &r, Insecure: insecure}
|
||||
}
|
||||
|
||||
@@ -12,9 +12,11 @@ import (
|
||||
restclient "k8s.io/client-go/rest"
|
||||
|
||||
imageapiv1 "github.com/openshift/api/image/v1"
|
||||
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/audit"
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
|
||||
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -44,7 +46,7 @@ type repository struct {
|
||||
app *App
|
||||
crossmount bool
|
||||
|
||||
imageStream *imageStream
|
||||
imageStream imagestream.ImageStream
|
||||
|
||||
// remoteBlobGetter is used to fetch blobs from remote registries if pullthrough is enabled.
|
||||
remoteBlobGetter BlobGetterService
|
||||
@@ -64,11 +66,8 @@ func (app *App) Repository(ctx context.Context, repo distribution.Repository, cr
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
imageStreamGetter := &cachedImageStreamGetter{
|
||||
ctx: ctx,
|
||||
namespace: namespace,
|
||||
name: name,
|
||||
isNamespacer: registryOSClient,
|
||||
imageStreamCache := &cache.RepoDigest{
|
||||
Cache: app.cache,
|
||||
}
|
||||
|
||||
r := &repository{
|
||||
@@ -79,22 +78,25 @@ func (app *App) Repository(ctx context.Context, repo distribution.Repository, cr
|
||||
crossmount: crossmount,
|
||||
|
||||
imageStream: &imageStream{
|
||||
namespace: namespace,
|
||||
name: name,
|
||||
registryOSClient: registryOSClient,
|
||||
cachedImages: make(map[digest.Digest]*imageapiv1.Image),
|
||||
imageStreamGetter: imageStreamGetter,
|
||||
cache: &cache.RepoDigest{
|
||||
Cache: app.cache,
|
||||
namespace: namespace,
|
||||
name: name,
|
||||
registryOSClient: registryOSClient,
|
||||
cachedImages: make(map[digest.Digest]*imageapiv1.Image),
|
||||
imageStreamGetter: &cachedImageStreamGetter{
|
||||
ctx: ctx,
|
||||
namespace: namespace,
|
||||
name: name,
|
||||
isNamespacer: registryOSClient,
|
||||
},
|
||||
cache: imageStreamCache,
|
||||
},
|
||||
}
|
||||
|
||||
if app.config.Pullthrough.Enabled {
|
||||
r.remoteBlobGetter = NewBlobGetterService(
|
||||
r.imageStream,
|
||||
r.imageStream.getSecrets,
|
||||
r.imageStream.cache)
|
||||
r.imageStream.GetSecrets,
|
||||
imageStreamCache)
|
||||
}
|
||||
|
||||
bdsf := blobDescriptorServiceFactoryFunc(r.BlobDescriptorService)
|
||||
|
||||
@@ -3,12 +3,14 @@ package server
|
||||
import (
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/context"
|
||||
|
||||
"github.com/openshift/image-registry/pkg/imagestream"
|
||||
)
|
||||
|
||||
type tagService struct {
|
||||
distribution.TagService
|
||||
|
||||
imageStream *imageStream
|
||||
imageStream imagestream.ImageStream
|
||||
pullthroughEnabled bool
|
||||
}
|
||||
|
||||
@@ -32,7 +34,7 @@ func (t tagService) Get(ctx context.Context, tag string) (distribution.Descripto
|
||||
}
|
||||
|
||||
if !t.pullthroughEnabled {
|
||||
image, err := t.imageStream.getImage(ctx, dgst)
|
||||
image, err := t.imageStream.GetImage(ctx, dgst)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
@@ -69,7 +71,7 @@ func (t tagService) All(ctx context.Context) ([]string, error) {
|
||||
|
||||
managed, found := managedImages[dgst.String()]
|
||||
if !found {
|
||||
image, err := t.imageStream.getImage(ctx, dgst)
|
||||
image, err := t.imageStream.GetImage(ctx, dgst)
|
||||
if err != nil {
|
||||
context.GetLogger(ctx).Errorf("unable to get image %s %s: %v", t.imageStream.Reference(), dgst.String(), err)
|
||||
continue
|
||||
@@ -115,7 +117,7 @@ func (t tagService) Lookup(ctx context.Context, desc distribution.Descriptor) ([
|
||||
|
||||
managed, found := managedImages[dgst.String()]
|
||||
if !found {
|
||||
image, err := t.imageStream.getImage(ctx, dgst)
|
||||
image, err := t.imageStream.GetImage(ctx, dgst)
|
||||
if err != nil {
|
||||
context.GetLogger(ctx).Errorf("unable to get image %s %s: %v", t.imageStream.Reference(), dgst.String(), err)
|
||||
continue
|
||||
|
||||
49
pkg/imagestream/imagestream.go
Normal file
49
pkg/imagestream/imagestream.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package imagestream
|
||||
|
||||
import (
|
||||
"github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/digest"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
imageapiv1 "github.com/openshift/api/image/v1"
|
||||
|
||||
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
|
||||
)
|
||||
|
||||
// ProjectObjectListStore represents a cache of objects indexed by a project name.
|
||||
// Used to store a list of items per namespace.
|
||||
type ProjectObjectListStore interface {
|
||||
Add(namespace string, obj runtime.Object) error
|
||||
Get(namespace string) (obj runtime.Object, exists bool, err error)
|
||||
}
|
||||
|
||||
// ImagePullthroughSpec contains a reference of remote image to pull associated with an insecure flag for the
|
||||
// corresponding registry.
|
||||
type ImagePullthroughSpec struct {
|
||||
DockerImageReference *imageapi.DockerImageReference
|
||||
Insecure bool
|
||||
}
|
||||
|
||||
type ImageStream interface {
|
||||
Reference() string
|
||||
Exists() (bool, error)
|
||||
|
||||
GetStoredImageOfImageStream(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, *imageapiv1.TagEvent, *imageapiv1.ImageStream, error)
|
||||
GetImageOfImageStream(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, *imageapiv1.ImageStream, error)
|
||||
CreateImageStreamMapping(ctx context.Context, tag string, image *imageapiv1.Image) error
|
||||
UpdateImage(image *imageapiv1.Image) (*imageapiv1.Image, error)
|
||||
GetImage(ctx context.Context, dgst digest.Digest) (*imageapiv1.Image, error)
|
||||
RememberLayersOfImage(ctx context.Context, image *imageapiv1.Image, cacheName string)
|
||||
|
||||
HasBlob(ctx context.Context, dgst digest.Digest, requireManaged bool) bool
|
||||
IdentifyCandidateRepositories(primary bool) ([]string, map[string]ImagePullthroughSpec, error)
|
||||
GetLimitRangeList(ctx context.Context, cache ProjectObjectListStore) (*corev1.LimitRangeList, error)
|
||||
GetSecrets() ([]corev1.Secret, error)
|
||||
|
||||
TagIsInsecure(tag string, dgst digest.Digest) (bool, error)
|
||||
Tags(ctx context.Context) (map[string]digest.Digest, error)
|
||||
Tag(ctx context.Context, tag string, dgst digest.Digest, pullthroughEnabled bool) error
|
||||
Untag(ctx context.Context, tag string, pullthroughEnabled bool) error
|
||||
}
|
||||
Reference in New Issue
Block a user