1
0
mirror of https://github.com/helm/chartmuseum.git synced 2026-02-05 15:45:50 +01:00

Optimized index regeneration

Index regeneration before this PR was causing a concurrency pile-up when
many requests were coming in at a fast pace.  This was causing the time
to return /index to always increase. The behavior was observed on a high
latency connection to Google Cloud Storage, but would also occur with
other network stores.

With this PR, the regeneration time is constant and thus the serving of
index.yaml is also constant.

The implementation takes a general approach to first make initial
network requests (to satisfy the object list from storage) and to pile
up subsequent requests while the initial one is being completed. The
same algorithm is used to update the in-memory cached index.  Note that
these requests need not be protected by a lock because they are only
ever executing on their own. See server.getChartList() for the
implementaion of this idea.

A refactor was needed to separate the fetch from the diff calculation to
allow separate calling of the network heavy operations. While doing so,
we also removed redundant calls to storage file list update.

Also made small low-hanging fruit style optimisations to index
manipulations.

Added request ID to all requests for better debugging. This will be
visible with the --debug flag. This was indispensable to diagnose
complex concurrent processing.

To test the before and after state, we have added the use of the
locusti.io loadtesting engine. A simple README in the loadtesting/
directory shows how to install locust (with pipenv) and loadtest
chartmuseum. This will prove useful in the future.

Fixes #18
This commit is contained in:
David Genest
2017-11-09 22:38:14 -05:00
parent 8075c86dfe
commit 0e9d2e36ea
11 changed files with 472 additions and 112 deletions

View File

@@ -17,12 +17,23 @@ endif
@glide install --strip-vendor
.PHONY: build
build: export GOARCH=amd64
build: export CGO_ENABLED=0
build:
@GOOS=linux go build -v -i --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \
build: build_linux build_mac build_windows
build_windows: export GOARCH=amd64
build_windows:
@GOOS=windows go build -v --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \
-o bin/windows/amd64/chartmuseum cmd/chartmuseum/main.go # windows
build_linux: export GOARCH=amd64
build_linux: export CGO_ENABLED=0
build_linux:
@GOOS=linux go build -v --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \
-o bin/linux/amd64/chartmuseum cmd/chartmuseum/main.go # linux
@GOOS=darwin go build -v -i --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \
build_mac: export GOARCH=amd64
build_mac: export CGO_ENABLED=0
build_mac:
@GOOS=darwin go build -v --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \
-o bin/darwin/amd64/chartmuseum cmd/chartmuseum/main.go # mac osx
.PHONY: clean

14
loadtesting/Pipfile Normal file
View File

@@ -0,0 +1,14 @@
[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"
[dev-packages]
[packages]
locustio = "*"

224
loadtesting/Pipfile.lock generated Normal file
View File

@@ -0,0 +1,224 @@
{
"_meta": {
"hash": {
"sha256": "61aaaf047ac37f113fb2f160addc46622690eca395ca67d7ef029f596b45cfd6"
},
"host-environment-markers": {
"implementation_name": "cpython",
"implementation_version": "0",
"os_name": "posix",
"platform_machine": "x86_64",
"platform_python_implementation": "CPython",
"platform_release": "15.6.0",
"platform_system": "Darwin",
"platform_version": "Darwin Kernel Version 15.6.0: Sun Jun 4 21:43:07 PDT 2017; root:xnu-3248.70.3~1/RELEASE_X86_64",
"python_full_version": "2.7.13",
"python_version": "2.7",
"sys_platform": "darwin"
},
"pipfile-spec": 6,
"requires": {},
"sources": [
{
"name": "pypi",
"url": "https://pypi.python.org/simple",
"verify_ssl": true
}
]
},
"default": {
"certifi": {
"hashes": [
"sha256:244be0d93b71e93fc0a0a479862051414d0e00e16435707e5bf5000f92e04694",
"sha256:5ec74291ca1136b40f0379e1128ff80e866597e4e2c1e755739a913bbc3613c0"
],
"version": "==2017.11.5"
},
"chardet": {
"hashes": [
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691",
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae"
],
"version": "==3.0.4"
},
"click": {
"hashes": [
"sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d",
"sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b"
],
"version": "==6.7"
},
"flask": {
"hashes": [
"sha256:0749df235e3ff61ac108f69ac178c9770caeaccad2509cb762ce1f65570a8856",
"sha256:49f44461237b69ecd901cc7ce66feea0319b9158743dd27a2899962ab214dac1"
],
"version": "==0.12.2"
},
"gevent": {
"hashes": [
"sha256:9b492bb1a043540abb6e54fdb5537531e24962ca49c09f3b47dc4f9c37f6297c",
"sha256:de13a8e378103af84a8bf6015ad1d2761d46f29b8393e8dd6d9bb7cb51bbb713",
"sha256:deafd70d04ab62428d4e291e8e2c0fb22f38690e6a9f23a67ee6c304087634da",
"sha256:b67a10799923f9fed546ca5f8b93a2819c71a60132d7a97b4a13fbdab66b278a",
"sha256:35790f1a3c8e431ada3471b70bb2105050009ea4beb15cbe41b86bc716a7ffa9",
"sha256:c9dd6534c46ed782e2d7236767cd07115cb29ce8670c2fc0794f264de9024fe0",
"sha256:b7e0e6400c2f3ce78a9ae1cdd55b53166feedd003d60c033863881227129a4d3",
"sha256:0901975628790e8a57fc92bb7062e5b856edea48c8de9caf36cfda14eae07329",
"sha256:df52e06a2754c2d905aad75a7dc06a732c804d9edbc87f06f47c8f483ba98bca",
"sha256:70558dd45c7a1f8046ba45792e489dd0f409bd8a3b7a0635ca9d3055223b3dff",
"sha256:8a710eddb3e9e5f22bdbd458b5f211b94f59409ecd6896f15b9fee2cba266a59",
"sha256:60109741377367eef8ded9283a1bf629621b73acaf3e1e8aac9d1a0f50fa0f05",
"sha256:552719cec4721673b8c7d2f9de666e3f7591b9b182f801ecaef1c76e638052aa",
"sha256:a16db4f56699ef07f0249b953ff949aae641e50b2bdc4710f11c0d8d9089b296",
"sha256:59e9237af027f8db85e5d78a9da2e328ae96f01d67a0d62abcecad3db7876908",
"sha256:833bebdc36bfeeedefc200ca9aee9b8eddd80f56b63ca1e886e18b97b1240edd",
"sha256:81cb24e0f7bd9888596364e8d8ed0d65c2547c84884c67bb46d956faeed67396",
"sha256:1af93825db5753550fa8ff5ab2f2132e8733170b3f8d38347b34fa4a984cb624",
"sha256:2ff045a91509c35664c27a849c8cbf742a227f587b7cdbc88301e9c85dcaedff",
"sha256:a66cf99f08da65c501826a19e30f5a6e7ba942fdd79baba5ce2d51eebaa13444",
"sha256:4791c8ae9c57d6f153354736e1ccab1e2baf6c8d9ae5a77a9ac90f41e2966b2d",
"sha256:74bce0c30bb2240e3d5d515ba8cb3eadf840c2bde7109a1979c7a26c9d0f5a6a",
"sha256:a0ed8ba787b9c0c1c565c2675d71652e6c1e2d4e91f53530860d0303e867fe85",
"sha256:c35b29de49211014ec66d056fd4f9ba7a04795e2a654697f72879c0cf365d6d4",
"sha256:6892fabc9051e8c0a171d543b6536859aabeb6d169db79b2f45d64dc2a15808c",
"sha256:fce894a64db3911897cdad6c37fbb23dfb18b7bf8b9cb8c00a8ea0a7253651c9",
"sha256:4f098002126ebef7f2907188b6c8b09e5193161ce968847d9e6a8bc832b0db9a",
"sha256:33fa6759eabc9176ddbe0d29b66867a82e19a61f06eb7cfabbac35343c0ecf24",
"sha256:7f93b67b680f4a921f517294048d05f8f6f0ed5962b78d6685a6cf0fcd7d8202"
],
"version": "==1.2.2"
},
"greenlet": {
"hashes": [
"sha256:96888e47898a471073b394ea641b7d675c1d054c580dd4a04a382bd34e67d89e",
"sha256:d2d5103f6cba131e1be660230018e21f276911d2b68b629ead1c5cb5e5472ac7",
"sha256:bc339de0e0969de5118d0b62a080a7611e2ba729a90f4a3ad78559c51bc5576d",
"sha256:b8ab98f8ae25938326dc4c21e3689a933531500ae4f3bfcefe36e3e25fda4dbf",
"sha256:416a3328d7e0a19aa1df3ec09524a109061fd7b80e010ef0dff9f695b4ac5e20",
"sha256:21232907c8c26838b16915bd8fbbf82fc70c996073464cc70981dd4a96bc841c",
"sha256:6803d8c6b235c861c50afddf00c7467ffbcd5ab960d137ff0f9c36f2cb11ee4b",
"sha256:76dab055476dd4dabb00a967b4df1990b25542d17eaa40a18f66971d10193e0b",
"sha256:70b9ff28921f5a3c03df4896ec8c55f5f94c593d7a79abd98b4c5c4a692ba873",
"sha256:7114b757b4146f4c87a0f00f1e58abd4c4729836679af0fc37266910a4a72eb0",
"sha256:0d90c709355ed13f16676f84e5a9cd67826a9f5c5143381c21e8fc3100ade1f1",
"sha256:ebae83b6247f83b1e8d887733dfa8046ce6e29d8b3e2a7380256e9de5c6ae55d",
"sha256:e841e3ece633acae5e2bf6102140a605ffee7d5d4921dca1625c5fdc0f0b3248",
"sha256:3e5e9be157ece49e4f97f3225460caf758ccb00f934fcbc5db34367cc1ff0aee",
"sha256:e77b708c37b652c7501b9f8f6056b23633c567aaa0d29edfef1c11673c64b949",
"sha256:0da1fc809c3bdb93fbacd0f921f461aacd53e554a7b7d4e9953ba09131c4206e",
"sha256:66fa5b101fcf4521138c1a29668074268d938bbb7de739c8faa9f92ea1f05e1f",
"sha256:e5451e1ce06b74a4861576c2db74405a4398c4809a105774550a9e52cfc8c4da",
"sha256:9c407aa6adfd4eea1232e81aa9f3cb3d9b955a9891c4819bf9b498c77efba14b",
"sha256:b56ac981f07b77e72ad5154278b93396d706572ea52c2fce79fee2abfcc8bfa6",
"sha256:e4c99c6010a5d153d481fdaf63b8a0782825c0721506d880403a3b9b82ae347e"
],
"version": "==0.4.12"
},
"idna": {
"hashes": [
"sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4",
"sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f"
],
"version": "==2.6"
},
"itsdangerous": {
"hashes": [
"sha256:cbb3fcf8d3e33df861709ecaf89d9e6629cff0a217bc2848f1b41cd30d360519"
],
"version": "==0.24"
},
"jinja2": {
"hashes": [
"sha256:74c935a1b8bb9a3947c50a54766a969d4846290e1e788ea44c1392163723c3bd",
"sha256:f84be1bb0040caca4cea721fcbbbbd61f9be9464ca236387158b0feea01914a4"
],
"version": "==2.10"
},
"locustio": {
"hashes": [
"sha256:64583987ba1c330bb071aee3e29d2eedbfb7c8b342fa064bfb74fafcff660d61"
],
"version": "==0.8.1"
},
"markupsafe": {
"hashes": [
"sha256:a6be69091dac236ea9c6bc7d012beab42010fa914c459791d627dad4910eb665"
],
"version": "==1.0"
},
"msgpack-python": {
"hashes": [
"sha256:637b012c9ea021de7a7a75d6ff5e82cfef6694babd7e14bb9a3adcb2a5bd52f0",
"sha256:658c1cd5dcf7786e0e7a6d523cd0c5b33f92e139e224bd73cb3a23ada618d2dc",
"sha256:920bbbaee07ad048a4d2b4160901b19775c61ef9439f856c74509e763a326249",
"sha256:e165006f7e3d2612f1bffe2f6f042ca317d8df724d8b72a39b14c2e46c67eaae",
"sha256:95d70edd50e3d2f6ea1189f77190e4a0172626e7405ddd1689f3f64814447cba",
"sha256:7e1b12ea0134460052fabcfaa0f488ec0fc21deb14832d66236fd2870757d8f1",
"sha256:8f36890251f20d96267618cf64735759d7ef7e91bc0b86b9480547d2d1397a68",
"sha256:1e68a277e4180baa7789be36f27f0891660205f6209f78a32282d3c422873d78",
"sha256:f52d9f96df952369fe4adcb0506e10c1c92d47f653f601a66da2a26a7e7141ea",
"sha256:58c9c1d7891a35bddc6ee5dbec10d347a7ae4983169c24fc5fc8a57ae792ca76",
"sha256:1a2b19df0f03519ec7f19f826afb935b202d8979b0856c6fb3dc28955799f886"
],
"version": "==0.4.8"
},
"pyzmq": {
"hashes": [
"sha256:f88d8ffa32bf729bed806ca3205846ed98b61f149562dc828db5c3a2194ca0c3",
"sha256:83c30a7fa401d5f26378414c6e67bdf8633780cf514b31bd2084b9ab4b226484",
"sha256:5f357871b560bfacfa62a737292c59b42fe06c6e3433567f49f42dec32b45b11",
"sha256:6602d9958656a255a067ab87b569695a51e275bb77111b057c12ac702544fdca",
"sha256:068977e0f5cfba05d910df9899bcbd379e6536666ddabb2ce6a1218673229c65",
"sha256:29fe1d6a79ea3b4cfcb3b23ef2159b34a42a347f74010fe5510484543a5367e2",
"sha256:09ecf151c912f1d0907bfda331fa0ed5cd0032bf1d8733512f1befc8e89b387f",
"sha256:edde3830b2ac4c97f11fdafeea02dffd2343b470482864a3cd3894c839a0a416",
"sha256:37d7c8bfa3c7e8121ca7a71fe807cd5b63793d05ad598df53449d49e5dea1cc3",
"sha256:9848772e952f2addbb77182ae14ef201fd4f0e394e6e73c4620b9fd8f154f4a9",
"sha256:4d2bda85a2fe827fe1cce007f4bab20dae4bf56fa468d19bc730bb225c349bf3",
"sha256:7ded1d19978e52557348fd89f2c3f67eb73c19700b248355bcc50ac582387a95",
"sha256:b3c6abbebd79866465d6bbc0ab239cc4d1ba3bcce203b5294bb494665dcf4105",
"sha256:c382cb87d65de9718c738ee3a2f4f7c8444bd5d1a002bd370b5365d906c13f27",
"sha256:f41626459031c36457a6af56dd91e432b315b0bce8458ba08edbf6e4de88b467",
"sha256:afe36a0ab8ada3b8d9a005c6aaca8a434c425bd019b7fa6d323b9fda46157389",
"sha256:686ffacf9c62bdd4253ab3e093b32aaf195961c7545583a416cc534caa8a5900",
"sha256:09802528d0092b70131a9df92c975bb625cf2038fc0f53915bf51ffa665a7957",
"sha256:22ea664f4d115cd9fa0f2dc8c907298437f4d2fcf0677c18eddf45da2436f923",
"sha256:505d2a4fc2a899d30663e12550dca3d17d48c5796dd1307a0197656bc40b67c8",
"sha256:657147db62a16794aca7a18b51139180bc98d6249931507b1c8ef696607ada43",
"sha256:3a942e278f79ddcf455ba160881a95de474123667c3c847683bc3ae1929093f5",
"sha256:8a883824147523c0fe76d247dd58994c1c28ef07f1cc5dde595a4fd1c28f2580"
],
"version": "==16.0.3"
},
"requests": {
"hashes": [
"sha256:6a1b267aa90cac58ac3a765d067950e7dbbf75b1da07e895d1f594193a40a38b",
"sha256:9c443e7324ba5b85070c4a818ade28bfabedf16ea10206da1132edaa6dda237e"
],
"version": "==2.18.4"
},
"six": {
"hashes": [
"sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb",
"sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9"
],
"version": "==1.11.0"
},
"urllib3": {
"hashes": [
"sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b",
"sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f"
],
"version": "==1.22"
},
"werkzeug": {
"hashes": [
"sha256:e8549c143af3ce6559699a01e26fa4174f4c591dbee0a499f3cd4c3781cdec3d",
"sha256:903a7b87b74635244548b30d30db4c8947fe64c5198f58899ddcd3a13c23bb26"
],
"version": "==0.12.2"
}
},
"develop": {}
}

27
loadtesting/README.md Normal file
View File

@@ -0,0 +1,27 @@
Loadtesting is made with the excellent Python [locust](https://locust.io/) library.
To facilitate installation, this loadtesting subproject uses pipenv.
Install pipenv
```
pip install pipenv
```
Install chartmuseum locust loadtesting
```
cd loadtesting
pipenv install
```
Start chartmuseum.
Start locust:
```
# run locust on a running chartmuseum instance
pipenv run locust --host http://localhost:8080
```
Open your locust console in your browser at http://localhost:8089, and start a new loadtest.

View File

@@ -3,16 +3,11 @@ import tarfile
import io
patch_version = 1
chart_post_field_name = 'chart'
def index(l):
l.client.get("/index.yaml")
def metrics(l):
l.client.get("/metrics")
def not_found(l):
l.client.get("/toto")
def post_new_chart(l):
global patch_version
@@ -31,11 +26,11 @@ def post_new_chart(l):
t.close()
tgz_buf.seek(0)
l.client.post('/api/charts', files={'chartfile': (chart_fn, tgz_buf)})
l.client.post('/api/charts', files={chart_post_field_name: (chart_fn, tgz_buf)})
class UserBehavior(TaskSet):
tasks = {index: 15, metrics: 1, post_new_chart: 1}
tasks = {index: 10, post_new_chart: 1}
class WebsiteUser(HttpLocust):

View File

@@ -30,31 +30,31 @@ type (
)
func (server *Server) getIndexFileRequestHandler(c *gin.Context) {
err := server.syncRepositoryIndex()
index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string))
if err != nil {
c.JSON(500, errorResponse(err))
return
}
c.Data(200, repo.IndexFileContentType, server.RepositoryIndex.Raw)
c.Data(200, repo.IndexFileContentType, index.Raw)
}
func (server *Server) getAllChartsRequestHandler(c *gin.Context) {
err := server.syncRepositoryIndex()
index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string))
if err != nil {
c.JSON(500, errorResponse(err))
return
}
c.JSON(200, server.RepositoryIndex.Entries)
c.JSON(200, index.Entries)
}
func (server *Server) getChartRequestHandler(c *gin.Context) {
name := c.Param("name")
err := server.syncRepositoryIndex()
index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string))
if err != nil {
c.JSON(500, errorResponse(err))
return
}
chart := server.RepositoryIndex.Entries[name]
chart := index.Entries[name]
if chart == nil {
c.JSON(404, notFoundErrorResponse)
return
@@ -68,12 +68,12 @@ func (server *Server) getChartVersionRequestHandler(c *gin.Context) {
if version == "latest" {
version = ""
}
err := server.syncRepositoryIndex()
index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string))
if err != nil {
c.JSON(500, errorResponse(err))
return
}
chartVersion, err := server.RepositoryIndex.Get(name, version)
chartVersion, err := index.Get(name, version)
if err != nil {
c.JSON(404, notFoundErrorResponse)
return

View File

@@ -4,7 +4,9 @@ import (
"context"
"fmt"
"regexp"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/kubernetes-helm/chartmuseum/pkg/repo"
@@ -27,20 +29,30 @@ type (
Router struct {
*gin.Engine
}
fetchedObjects struct {
objects []storage.Object
err error
}
indexRegeneration struct {
index *repo.Index
err error
}
// Server contains a Logger, Router, storage backend and object cache
Server struct {
Logger *Logger
Router *Router
RepositoryIndex *repo.Index
StorageBackend storage.Backend
StorageCache []storage.Object
StorageCacheLock *sync.Mutex
AllowOverwrite bool
TlsCert string
TlsKey string
ChartPostFormFieldName string
ProvPostFormFieldName string
Logger *Logger
Router *Router
RepositoryIndex *repo.Index
StorageBackend storage.Backend
StorageCache []storage.Object
regenerationLock *sync.Mutex
fetchedObjectsLock *sync.Mutex
fetchedObjectsChans []chan fetchedObjects
regeneratedIndexesChans []chan indexRegeneration
AllowOverwrite bool
TlsCert string
TlsKey string
ChartPostFormFieldName string
ProvPostFormFieldName string
}
// ServerOptions are options for constructing a Server
@@ -127,7 +139,8 @@ func NewServer(options ServerOptions) (*Server, error) {
RepositoryIndex: repo.NewIndex(options.ChartURL),
StorageBackend: options.StorageBackend,
StorageCache: []storage.Object{},
StorageCacheLock: &sync.Mutex{},
regenerationLock: &sync.Mutex{},
fetchedObjectsLock: &sync.Mutex{},
AllowOverwrite: options.AllowOverwrite,
TlsCert: options.TlsCert,
TlsKey: options.TlsKey,
@@ -137,7 +150,8 @@ func NewServer(options ServerOptions) (*Server, error) {
server.setRoutes(options.EnableAPI)
err = server.regenerateRepositoryIndex()
// prime the cache
_, err = server.syncRepositoryIndex("bootstrap")
return server, err
}
@@ -154,11 +168,15 @@ func (server *Server) Listen(port int) {
}
func loggingMiddleware(logger *Logger) gin.HandlerFunc {
var requestID int64
return func(c *gin.Context) {
reqID := strconv.FormatInt(atomic.AddInt64(&requestID, 1), 10)
c.Set("RequestID", reqID)
logger.Debugf("(%s) Incoming request: %s", reqID, c.Request.URL.Path)
start := time.Now()
c.Next()
msg := "Request served"
msg := fmt.Sprintf("(%s) Request served", reqID)
status := c.Writer.Status()
meta := []interface{}{
@@ -181,22 +199,81 @@ func loggingMiddleware(logger *Logger) gin.HandlerFunc {
}
}
func (server *Server) syncRepositoryIndex() error {
_, diff, err := server.listObjectsGetDiff()
if err != nil {
return err
// getChartList fetches from the server and accumulates concurrent requests to be fulfilled
// all at once.
func (server *Server) getChartList(reqID string) <-chan fetchedObjects {
c := make(chan fetchedObjects, 1)
server.fetchedObjectsLock.Lock()
server.fetchedObjectsChans = append(server.fetchedObjectsChans, c)
if len(server.fetchedObjectsChans) == 1 {
// this unlock is wanted, while fetching the list, allow other channeled requests to be added
server.fetchedObjectsLock.Unlock()
objects, err := server.fetchChartsInStorage(reqID)
server.fetchedObjectsLock.Lock()
// flush every other consumer that also wanted the index
for _, foCh := range server.fetchedObjectsChans {
foCh <- fetchedObjects{objects, err}
}
server.fetchedObjectsChans = nil
}
if !diff.Change {
return nil
}
err = server.regenerateRepositoryIndex()
return err
server.fetchedObjectsLock.Unlock()
return c
}
func (server *Server) listObjectsGetDiff() ([]storage.Object, storage.ObjectSliceDiff, error) {
func (server *Server) regenerateRepositoryIndex(diff storage.ObjectSliceDiff, storageObjects []storage.Object, reqID string) <-chan indexRegeneration {
c := make(chan indexRegeneration, 1)
server.regenerationLock.Lock()
server.regeneratedIndexesChans = append(server.regeneratedIndexesChans, c)
if len(server.regeneratedIndexesChans) == 1 {
server.regenerationLock.Unlock()
index, err := server.regenerateRepositoryIndexWorker(diff, storageObjects, reqID)
server.regenerationLock.Lock()
for _, riCh := range server.regeneratedIndexesChans {
riCh <- indexRegeneration{index, err}
}
server.regeneratedIndexesChans = nil
}
server.regenerationLock.Unlock()
return c
}
// syncRepositoryIndex is the workhorse of maintaining a coherent index cache. It is optimized for multiple requests
// comming in a short period. When two requests for the backing store arrive, only the first is served, and other consumers receive the
// result of this request. This allows very fast updates in constant time. See getChartList() and regenerateRepositoryIndex().
func (server *Server) syncRepositoryIndex(reqID string) (*repo.Index, error) {
fo := <-server.getChartList(reqID)
if fo.err != nil {
return nil, fo.err
}
diff := storage.GetObjectSliceDiff(server.StorageCache, fo.objects)
// return fast if no changes
if !diff.Change {
return server.RepositoryIndex, nil
}
ir := <-server.regenerateRepositoryIndex(diff, fo.objects, reqID)
return ir.index, ir.err
}
func (server *Server) fetchChartsInStorage(reqID string) ([]storage.Object, error) {
server.Logger.Debugf("(%s) Fetching chart list from storage", reqID)
allObjects, err := server.StorageBackend.ListObjects()
if err != nil {
return []storage.Object{}, storage.ObjectSliceDiff{}, err
return []storage.Object{}, err
}
// filter out storage objects that dont have extension used for chart packages (.tgz)
@@ -207,67 +284,57 @@ func (server *Server) listObjectsGetDiff() ([]storage.Object, storage.ObjectSlic
}
}
diff := storage.GetObjectSliceDiff(server.StorageCache, filteredObjects)
return filteredObjects, diff, nil
return filteredObjects, nil
}
func (server *Server) regenerateRepositoryIndex() error {
server.Logger.Debugw("Acquiring storage cache lock")
server.StorageCacheLock.Lock()
server.Logger.Debugw("Storage cache lock acquired")
defer func() {
server.Logger.Debugw("Releasing storage cache lock")
server.StorageCacheLock.Unlock()
}()
objects, diff, err := server.listObjectsGetDiff()
if err != nil {
return err
}
index := &repo.Index{
IndexFile: server.RepositoryIndex.IndexFile,
Raw: server.RepositoryIndex.Raw,
ChartURL: server.RepositoryIndex.ChartURL,
}
func (server *Server) regenerateRepositoryIndexWorker(diff storage.ObjectSliceDiff, storageObjects []storage.Object, reqID string) (*repo.Index, error) {
server.Logger.Debugf("(%s) Regenerating index.yaml", reqID)
index := &repo.Index{
IndexFile: server.RepositoryIndex.IndexFile,
Raw: server.RepositoryIndex.Raw,
ChartURL: server.RepositoryIndex.ChartURL,
}
for _, object := range diff.Removed {
err := server.removeIndexObject(index, object)
err := server.removeIndexObject(index, object, reqID)
if err != nil {
return err
return nil, err
}
}
for _, object := range diff.Updated {
err := server.updateIndexObject(index, object)
err := server.updateIndexObject(index, object, reqID)
if err != nil {
return err
return nil, err
}
}
// Parallelize retrieval of added objects to improve startup speed
err = server.addIndexObjectsAsync(index, diff.Added)
// Parallelize retrieval of added objects to improve speed
err := server.addIndexObjectsAsync(index, diff.Added, reqID)
if err != nil {
return err
return nil, err
}
server.Logger.Debug("Regenerating index.yaml")
err = index.Regenerate()
if err != nil {
return err
return nil, err
}
// It is very important that these two stay in sync as they reflect the same reality. StorageCache serves
// as object modification time cache, and RepositoryIndex is the canonical cached index.
server.RepositoryIndex = index
server.StorageCache = objects
return nil
server.StorageCache = storageObjects
server.Logger.Debugf("(%s) index.yaml regenerated", reqID)
return index, nil
}
func (server *Server) removeIndexObject(index *repo.Index, object storage.Object) error {
func (server *Server) removeIndexObject(index *repo.Index, object storage.Object, reqID string) error {
chartVersion, err := server.getObjectChartVersion(object, false)
if err != nil {
return server.checkInvalidChartPackageError(object, err, "removed")
}
server.Logger.Debugw("Removing chart from index",
server.Logger.Debugw(fmt.Sprintf("(%s) Removing chart from index", reqID),
"name", chartVersion.Name,
"version", chartVersion.Version,
)
@@ -275,12 +342,12 @@ func (server *Server) removeIndexObject(index *repo.Index, object storage.Object
return nil
}
func (server *Server) updateIndexObject(index *repo.Index, object storage.Object) error {
func (server *Server) updateIndexObject(index *repo.Index, object storage.Object, reqID string) error {
chartVersion, err := server.getObjectChartVersion(object, true)
if err != nil {
return server.checkInvalidChartPackageError(object, err, "updated")
}
server.Logger.Debugw("Updating chart in index",
server.Logger.Debugw(fmt.Sprintf("(%s) Updating chart in index", reqID),
"name", chartVersion.Name,
"version", chartVersion.Version,
)
@@ -288,13 +355,13 @@ func (server *Server) updateIndexObject(index *repo.Index, object storage.Object
return nil
}
func (server *Server) addIndexObjectsAsync(index *repo.Index, objects []storage.Object) error {
func (server *Server) addIndexObjectsAsync(index *repo.Index, objects []storage.Object, reqID string) error {
numObjects := len(objects)
if numObjects == 0 {
return nil
}
server.Logger.Debugw("Loading charts packages from storage (this could take awhile)",
server.Logger.Debugw(fmt.Sprintf("(%s) Loading charts packages from storage (this could take awhile)", reqID),
"total", numObjects,
)
@@ -335,7 +402,7 @@ func (server *Server) addIndexObjectsAsync(index *repo.Index, objects []storage.
if cvRes.cv == nil {
continue
}
server.Logger.Debugw("Adding chart to index",
server.Logger.Debugw(fmt.Sprintf("(%s) Adding chart to index", reqID),
"name", cvRes.cv.Name,
"version", cvRes.cv.Version,
)

View File

@@ -132,30 +132,41 @@ func (suite *ServerTestSuite) TearDownSuite() {
}
func (suite *ServerTestSuite) TestRegenerateRepositoryIndex() {
err := suite.Server.regenerateRepositoryIndex()
objects, err := suite.Server.fetchChartsInStorage("test")
diff := storage.GetObjectSliceDiff(suite.Server.StorageCache, objects)
_, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test")
suite.Nil(err, "no error regenerating repo index")
newtime := time.Now().Add(1 * time.Hour)
err = os.Chtimes(suite.TestTarballFilename, newtime, newtime)
suite.Nil(err, "no error changing modtime on temp file")
err = suite.Server.regenerateRepositoryIndex()
objects, err = suite.Server.fetchChartsInStorage("test")
diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects)
_, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test")
suite.Nil(err, "no error regenerating repo index with tarball updated")
brokenTarballFilename := pathutil.Join(suite.TempDirectory, "brokenchart.tgz")
destFile, err := os.Create(brokenTarballFilename)
suite.Nil(err, "no error creating new broken tarball in temp dir")
defer destFile.Close()
err = suite.Server.regenerateRepositoryIndex()
objects, err = suite.Server.fetchChartsInStorage("test")
diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects)
_, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test")
suite.Nil(err, "error not returned with broken tarball added")
err = os.Chtimes(brokenTarballFilename, newtime, newtime)
suite.Nil(err, "no error changing modtime on broken tarball")
err = suite.Server.regenerateRepositoryIndex()
objects, err = suite.Server.fetchChartsInStorage("test")
diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects)
_, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test")
suite.Nil(err, "error not returned with broken tarball updated")
err = os.Remove(brokenTarballFilename)
suite.Nil(err, "no error removing broken tarball")
err = suite.Server.regenerateRepositoryIndex()
objects, err = suite.Server.fetchChartsInStorage("test")
diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects)
_, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test")
suite.Nil(err, "error not returned with broken tarball removed")
}

View File

@@ -45,19 +45,16 @@ func (index *Index) Regenerate() error {
// RemoveEntry removes a chart version from index
func (index *Index) RemoveEntry(chartVersion *helm_repo.ChartVersion) {
for k := range index.Entries {
if k == chartVersion.Name {
for i, cv := range index.Entries[chartVersion.Name] {
if cv.Version == chartVersion.Version {
index.Entries[chartVersion.Name] = append(index.Entries[chartVersion.Name][:i],
index.Entries[chartVersion.Name][i+1:]...)
if len(index.Entries[chartVersion.Name]) == 0 {
delete(index.Entries, chartVersion.Name)
}
break
if entries, ok := index.Entries[chartVersion.Name]; ok {
for i, cv := range entries {
if cv.Version == chartVersion.Version {
index.Entries[chartVersion.Name] = append(entries[:i],
entries[i+1:]...)
if len(index.Entries[chartVersion.Name]) == 0 {
delete(index.Entries, chartVersion.Name)
}
break
}
break
}
}
}
@@ -71,18 +68,27 @@ func (index *Index) AddEntry(chartVersion *helm_repo.ChartVersion) {
index.Entries[chartVersion.Name] = append(index.Entries[chartVersion.Name], chartVersion)
}
// HasEntry checks if index has already an entry
func (index *Index) HasEntry(chartVersion *helm_repo.ChartVersion) bool {
if entries, ok := index.Entries[chartVersion.Name]; ok {
for _, cv := range entries {
if cv.Version == chartVersion.Version {
return true
}
}
}
return false
}
// UpdateEntry updates a chart version in index
func (index *Index) UpdateEntry(chartVersion *helm_repo.ChartVersion) {
for k := range index.Entries {
if k == chartVersion.Name {
for i, cv := range index.Entries[chartVersion.Name] {
if cv.Version == chartVersion.Version {
index.setChartURL(chartVersion)
index.Entries[chartVersion.Name][i] = chartVersion
break
}
if entries, ok := index.Entries[chartVersion.Name]; ok {
for i, cv := range entries {
if cv.Version == chartVersion.Version {
index.setChartURL(chartVersion)
entries[i] = chartVersion
break
}
break
}
}
}

View File

@@ -65,10 +65,13 @@ func (suite *IndexTestSuite) TestRemove() {
for i := 5; i < 10; i++ {
chartVersion := getChartVersion(name, i, now)
suite.Index.RemoveEntry(chartVersion)
suite.Empty(suite.Index.HasEntry(chartVersion))
}
}
chartVersion := getChartVersion("d", 0, now)
suite.Index.RemoveEntry(chartVersion)
suite.Empty(suite.Index.HasEntry(chartVersion))
}
func (suite *IndexTestSuite) TestChartURLs() {

View File

@@ -14,7 +14,9 @@ export PATH="$PWD/testbin:$PATH"
export HELM_HOME="$PWD/.helm"
main() {
check_env_vars
if [[ $TEST_CLOUD_STORAGE == 1 ]]; then
check_env_vars
fi
install_helm
package_test_charts
}