From 0e9d2e36eaeee752ca26fc4f9ea4e5f151fe3635 Mon Sep 17 00:00:00 2001 From: David Genest Date: Thu, 9 Nov 2017 22:38:14 -0500 Subject: [PATCH] Optimized index regeneration Index regeneration before this PR was causing a concurrency pile-up when many requests were coming in at a fast pace. This was causing the time to return /index to always increase. The behavior was observed on a high latency connection to Google Cloud Storage, but would also occur with other network stores. With this PR, the regeneration time is constant and thus the serving of index.yaml is also constant. The implementation takes a general approach to first make initial network requests (to satisfy the object list from storage) and to pile up subsequent requests while the initial one is being completed. The same algorithm is used to update the in-memory cached index. Note that these requests need not be protected by a lock because they are only ever executing on their own. See server.getChartList() for the implementaion of this idea. A refactor was needed to separate the fetch from the diff calculation to allow separate calling of the network heavy operations. While doing so, we also removed redundant calls to storage file list update. Also made small low-hanging fruit style optimisations to index manipulations. Added request ID to all requests for better debugging. This will be visible with the --debug flag. This was indispensable to diagnose complex concurrent processing. To test the before and after state, we have added the use of the locusti.io loadtesting engine. A simple README in the loadtesting/ directory shows how to install locust (with pipenv) and loadtest chartmuseum. This will prove useful in the future. Fixes #18 --- Makefile | 21 ++- loadtesting/Pipfile | 14 ++ loadtesting/Pipfile.lock | 224 ++++++++++++++++++++++++++++++ loadtesting/README.md | 27 ++++ loadtesting/locustfile.py | 11 +- pkg/chartmuseum/handlers.go | 16 +-- pkg/chartmuseum/server.go | 197 +++++++++++++++++--------- pkg/chartmuseum/server_test.go | 21 ++- pkg/repo/index.go | 46 +++--- pkg/repo/index_test.go | 3 + scripts/setup_test_environment.sh | 4 +- 11 files changed, 472 insertions(+), 112 deletions(-) create mode 100644 loadtesting/Pipfile create mode 100644 loadtesting/Pipfile.lock create mode 100644 loadtesting/README.md diff --git a/Makefile b/Makefile index 826038f..72545f6 100644 --- a/Makefile +++ b/Makefile @@ -17,12 +17,23 @@ endif @glide install --strip-vendor .PHONY: build -build: export GOARCH=amd64 -build: export CGO_ENABLED=0 -build: - @GOOS=linux go build -v -i --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \ +build: build_linux build_mac build_windows + +build_windows: export GOARCH=amd64 +build_windows: + @GOOS=windows go build -v --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \ + -o bin/windows/amd64/chartmuseum cmd/chartmuseum/main.go # windows + +build_linux: export GOARCH=amd64 +build_linux: export CGO_ENABLED=0 +build_linux: + @GOOS=linux go build -v --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \ -o bin/linux/amd64/chartmuseum cmd/chartmuseum/main.go # linux - @GOOS=darwin go build -v -i --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \ + +build_mac: export GOARCH=amd64 +build_mac: export CGO_ENABLED=0 +build_mac: + @GOOS=darwin go build -v --ldflags="-w -X main.Version=$(VERSION) -X main.Revision=$(REVISION)" \ -o bin/darwin/amd64/chartmuseum cmd/chartmuseum/main.go # mac osx .PHONY: clean diff --git a/loadtesting/Pipfile b/loadtesting/Pipfile new file mode 100644 index 0000000..59a0d02 --- /dev/null +++ b/loadtesting/Pipfile @@ -0,0 +1,14 @@ +[[source]] + +url = "https://pypi.python.org/simple" +verify_ssl = true +name = "pypi" + + +[dev-packages] + + + +[packages] + +locustio = "*" \ No newline at end of file diff --git a/loadtesting/Pipfile.lock b/loadtesting/Pipfile.lock new file mode 100644 index 0000000..96d7aca --- /dev/null +++ b/loadtesting/Pipfile.lock @@ -0,0 +1,224 @@ +{ + "_meta": { + "hash": { + "sha256": "61aaaf047ac37f113fb2f160addc46622690eca395ca67d7ef029f596b45cfd6" + }, + "host-environment-markers": { + "implementation_name": "cpython", + "implementation_version": "0", + "os_name": "posix", + "platform_machine": "x86_64", + "platform_python_implementation": "CPython", + "platform_release": "15.6.0", + "platform_system": "Darwin", + "platform_version": "Darwin Kernel Version 15.6.0: Sun Jun 4 21:43:07 PDT 2017; root:xnu-3248.70.3~1/RELEASE_X86_64", + "python_full_version": "2.7.13", + "python_version": "2.7", + "sys_platform": "darwin" + }, + "pipfile-spec": 6, + "requires": {}, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.python.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "certifi": { + "hashes": [ + "sha256:244be0d93b71e93fc0a0a479862051414d0e00e16435707e5bf5000f92e04694", + "sha256:5ec74291ca1136b40f0379e1128ff80e866597e4e2c1e755739a913bbc3613c0" + ], + "version": "==2017.11.5" + }, + "chardet": { + "hashes": [ + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691", + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae" + ], + "version": "==3.0.4" + }, + "click": { + "hashes": [ + "sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d", + "sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b" + ], + "version": "==6.7" + }, + "flask": { + "hashes": [ + "sha256:0749df235e3ff61ac108f69ac178c9770caeaccad2509cb762ce1f65570a8856", + "sha256:49f44461237b69ecd901cc7ce66feea0319b9158743dd27a2899962ab214dac1" + ], + "version": "==0.12.2" + }, + "gevent": { + "hashes": [ + "sha256:9b492bb1a043540abb6e54fdb5537531e24962ca49c09f3b47dc4f9c37f6297c", + "sha256:de13a8e378103af84a8bf6015ad1d2761d46f29b8393e8dd6d9bb7cb51bbb713", + "sha256:deafd70d04ab62428d4e291e8e2c0fb22f38690e6a9f23a67ee6c304087634da", + "sha256:b67a10799923f9fed546ca5f8b93a2819c71a60132d7a97b4a13fbdab66b278a", + "sha256:35790f1a3c8e431ada3471b70bb2105050009ea4beb15cbe41b86bc716a7ffa9", + "sha256:c9dd6534c46ed782e2d7236767cd07115cb29ce8670c2fc0794f264de9024fe0", + "sha256:b7e0e6400c2f3ce78a9ae1cdd55b53166feedd003d60c033863881227129a4d3", + "sha256:0901975628790e8a57fc92bb7062e5b856edea48c8de9caf36cfda14eae07329", + "sha256:df52e06a2754c2d905aad75a7dc06a732c804d9edbc87f06f47c8f483ba98bca", + "sha256:70558dd45c7a1f8046ba45792e489dd0f409bd8a3b7a0635ca9d3055223b3dff", + "sha256:8a710eddb3e9e5f22bdbd458b5f211b94f59409ecd6896f15b9fee2cba266a59", + "sha256:60109741377367eef8ded9283a1bf629621b73acaf3e1e8aac9d1a0f50fa0f05", + "sha256:552719cec4721673b8c7d2f9de666e3f7591b9b182f801ecaef1c76e638052aa", + "sha256:a16db4f56699ef07f0249b953ff949aae641e50b2bdc4710f11c0d8d9089b296", + "sha256:59e9237af027f8db85e5d78a9da2e328ae96f01d67a0d62abcecad3db7876908", + "sha256:833bebdc36bfeeedefc200ca9aee9b8eddd80f56b63ca1e886e18b97b1240edd", + "sha256:81cb24e0f7bd9888596364e8d8ed0d65c2547c84884c67bb46d956faeed67396", + "sha256:1af93825db5753550fa8ff5ab2f2132e8733170b3f8d38347b34fa4a984cb624", + "sha256:2ff045a91509c35664c27a849c8cbf742a227f587b7cdbc88301e9c85dcaedff", + "sha256:a66cf99f08da65c501826a19e30f5a6e7ba942fdd79baba5ce2d51eebaa13444", + "sha256:4791c8ae9c57d6f153354736e1ccab1e2baf6c8d9ae5a77a9ac90f41e2966b2d", + "sha256:74bce0c30bb2240e3d5d515ba8cb3eadf840c2bde7109a1979c7a26c9d0f5a6a", + "sha256:a0ed8ba787b9c0c1c565c2675d71652e6c1e2d4e91f53530860d0303e867fe85", + "sha256:c35b29de49211014ec66d056fd4f9ba7a04795e2a654697f72879c0cf365d6d4", + "sha256:6892fabc9051e8c0a171d543b6536859aabeb6d169db79b2f45d64dc2a15808c", + "sha256:fce894a64db3911897cdad6c37fbb23dfb18b7bf8b9cb8c00a8ea0a7253651c9", + "sha256:4f098002126ebef7f2907188b6c8b09e5193161ce968847d9e6a8bc832b0db9a", + "sha256:33fa6759eabc9176ddbe0d29b66867a82e19a61f06eb7cfabbac35343c0ecf24", + "sha256:7f93b67b680f4a921f517294048d05f8f6f0ed5962b78d6685a6cf0fcd7d8202" + ], + "version": "==1.2.2" + }, + "greenlet": { + "hashes": [ + "sha256:96888e47898a471073b394ea641b7d675c1d054c580dd4a04a382bd34e67d89e", + "sha256:d2d5103f6cba131e1be660230018e21f276911d2b68b629ead1c5cb5e5472ac7", + "sha256:bc339de0e0969de5118d0b62a080a7611e2ba729a90f4a3ad78559c51bc5576d", + "sha256:b8ab98f8ae25938326dc4c21e3689a933531500ae4f3bfcefe36e3e25fda4dbf", + "sha256:416a3328d7e0a19aa1df3ec09524a109061fd7b80e010ef0dff9f695b4ac5e20", + "sha256:21232907c8c26838b16915bd8fbbf82fc70c996073464cc70981dd4a96bc841c", + "sha256:6803d8c6b235c861c50afddf00c7467ffbcd5ab960d137ff0f9c36f2cb11ee4b", + "sha256:76dab055476dd4dabb00a967b4df1990b25542d17eaa40a18f66971d10193e0b", + "sha256:70b9ff28921f5a3c03df4896ec8c55f5f94c593d7a79abd98b4c5c4a692ba873", + "sha256:7114b757b4146f4c87a0f00f1e58abd4c4729836679af0fc37266910a4a72eb0", + "sha256:0d90c709355ed13f16676f84e5a9cd67826a9f5c5143381c21e8fc3100ade1f1", + "sha256:ebae83b6247f83b1e8d887733dfa8046ce6e29d8b3e2a7380256e9de5c6ae55d", + "sha256:e841e3ece633acae5e2bf6102140a605ffee7d5d4921dca1625c5fdc0f0b3248", + "sha256:3e5e9be157ece49e4f97f3225460caf758ccb00f934fcbc5db34367cc1ff0aee", + "sha256:e77b708c37b652c7501b9f8f6056b23633c567aaa0d29edfef1c11673c64b949", + "sha256:0da1fc809c3bdb93fbacd0f921f461aacd53e554a7b7d4e9953ba09131c4206e", + "sha256:66fa5b101fcf4521138c1a29668074268d938bbb7de739c8faa9f92ea1f05e1f", + "sha256:e5451e1ce06b74a4861576c2db74405a4398c4809a105774550a9e52cfc8c4da", + "sha256:9c407aa6adfd4eea1232e81aa9f3cb3d9b955a9891c4819bf9b498c77efba14b", + "sha256:b56ac981f07b77e72ad5154278b93396d706572ea52c2fce79fee2abfcc8bfa6", + "sha256:e4c99c6010a5d153d481fdaf63b8a0782825c0721506d880403a3b9b82ae347e" + ], + "version": "==0.4.12" + }, + "idna": { + "hashes": [ + "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4", + "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f" + ], + "version": "==2.6" + }, + "itsdangerous": { + "hashes": [ + "sha256:cbb3fcf8d3e33df861709ecaf89d9e6629cff0a217bc2848f1b41cd30d360519" + ], + "version": "==0.24" + }, + "jinja2": { + "hashes": [ + "sha256:74c935a1b8bb9a3947c50a54766a969d4846290e1e788ea44c1392163723c3bd", + "sha256:f84be1bb0040caca4cea721fcbbbbd61f9be9464ca236387158b0feea01914a4" + ], + "version": "==2.10" + }, + "locustio": { + "hashes": [ + "sha256:64583987ba1c330bb071aee3e29d2eedbfb7c8b342fa064bfb74fafcff660d61" + ], + "version": "==0.8.1" + }, + "markupsafe": { + "hashes": [ + "sha256:a6be69091dac236ea9c6bc7d012beab42010fa914c459791d627dad4910eb665" + ], + "version": "==1.0" + }, + "msgpack-python": { + "hashes": [ + "sha256:637b012c9ea021de7a7a75d6ff5e82cfef6694babd7e14bb9a3adcb2a5bd52f0", + "sha256:658c1cd5dcf7786e0e7a6d523cd0c5b33f92e139e224bd73cb3a23ada618d2dc", + "sha256:920bbbaee07ad048a4d2b4160901b19775c61ef9439f856c74509e763a326249", + "sha256:e165006f7e3d2612f1bffe2f6f042ca317d8df724d8b72a39b14c2e46c67eaae", + "sha256:95d70edd50e3d2f6ea1189f77190e4a0172626e7405ddd1689f3f64814447cba", + "sha256:7e1b12ea0134460052fabcfaa0f488ec0fc21deb14832d66236fd2870757d8f1", + "sha256:8f36890251f20d96267618cf64735759d7ef7e91bc0b86b9480547d2d1397a68", + "sha256:1e68a277e4180baa7789be36f27f0891660205f6209f78a32282d3c422873d78", + "sha256:f52d9f96df952369fe4adcb0506e10c1c92d47f653f601a66da2a26a7e7141ea", + "sha256:58c9c1d7891a35bddc6ee5dbec10d347a7ae4983169c24fc5fc8a57ae792ca76", + "sha256:1a2b19df0f03519ec7f19f826afb935b202d8979b0856c6fb3dc28955799f886" + ], + "version": "==0.4.8" + }, + "pyzmq": { + "hashes": [ + "sha256:f88d8ffa32bf729bed806ca3205846ed98b61f149562dc828db5c3a2194ca0c3", + "sha256:83c30a7fa401d5f26378414c6e67bdf8633780cf514b31bd2084b9ab4b226484", + "sha256:5f357871b560bfacfa62a737292c59b42fe06c6e3433567f49f42dec32b45b11", + "sha256:6602d9958656a255a067ab87b569695a51e275bb77111b057c12ac702544fdca", + "sha256:068977e0f5cfba05d910df9899bcbd379e6536666ddabb2ce6a1218673229c65", + "sha256:29fe1d6a79ea3b4cfcb3b23ef2159b34a42a347f74010fe5510484543a5367e2", + "sha256:09ecf151c912f1d0907bfda331fa0ed5cd0032bf1d8733512f1befc8e89b387f", + "sha256:edde3830b2ac4c97f11fdafeea02dffd2343b470482864a3cd3894c839a0a416", + "sha256:37d7c8bfa3c7e8121ca7a71fe807cd5b63793d05ad598df53449d49e5dea1cc3", + "sha256:9848772e952f2addbb77182ae14ef201fd4f0e394e6e73c4620b9fd8f154f4a9", + "sha256:4d2bda85a2fe827fe1cce007f4bab20dae4bf56fa468d19bc730bb225c349bf3", + "sha256:7ded1d19978e52557348fd89f2c3f67eb73c19700b248355bcc50ac582387a95", + "sha256:b3c6abbebd79866465d6bbc0ab239cc4d1ba3bcce203b5294bb494665dcf4105", + "sha256:c382cb87d65de9718c738ee3a2f4f7c8444bd5d1a002bd370b5365d906c13f27", + "sha256:f41626459031c36457a6af56dd91e432b315b0bce8458ba08edbf6e4de88b467", + "sha256:afe36a0ab8ada3b8d9a005c6aaca8a434c425bd019b7fa6d323b9fda46157389", + "sha256:686ffacf9c62bdd4253ab3e093b32aaf195961c7545583a416cc534caa8a5900", + "sha256:09802528d0092b70131a9df92c975bb625cf2038fc0f53915bf51ffa665a7957", + "sha256:22ea664f4d115cd9fa0f2dc8c907298437f4d2fcf0677c18eddf45da2436f923", + "sha256:505d2a4fc2a899d30663e12550dca3d17d48c5796dd1307a0197656bc40b67c8", + "sha256:657147db62a16794aca7a18b51139180bc98d6249931507b1c8ef696607ada43", + "sha256:3a942e278f79ddcf455ba160881a95de474123667c3c847683bc3ae1929093f5", + "sha256:8a883824147523c0fe76d247dd58994c1c28ef07f1cc5dde595a4fd1c28f2580" + ], + "version": "==16.0.3" + }, + "requests": { + "hashes": [ + "sha256:6a1b267aa90cac58ac3a765d067950e7dbbf75b1da07e895d1f594193a40a38b", + "sha256:9c443e7324ba5b85070c4a818ade28bfabedf16ea10206da1132edaa6dda237e" + ], + "version": "==2.18.4" + }, + "six": { + "hashes": [ + "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb", + "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9" + ], + "version": "==1.11.0" + }, + "urllib3": { + "hashes": [ + "sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b", + "sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f" + ], + "version": "==1.22" + }, + "werkzeug": { + "hashes": [ + "sha256:e8549c143af3ce6559699a01e26fa4174f4c591dbee0a499f3cd4c3781cdec3d", + "sha256:903a7b87b74635244548b30d30db4c8947fe64c5198f58899ddcd3a13c23bb26" + ], + "version": "==0.12.2" + } + }, + "develop": {} +} diff --git a/loadtesting/README.md b/loadtesting/README.md new file mode 100644 index 0000000..28c2561 --- /dev/null +++ b/loadtesting/README.md @@ -0,0 +1,27 @@ +Loadtesting is made with the excellent Python [locust](https://locust.io/) library. + +To facilitate installation, this loadtesting subproject uses pipenv. + +Install pipenv + +``` +pip install pipenv +``` + +Install chartmuseum locust loadtesting + +``` +cd loadtesting +pipenv install +``` + +Start chartmuseum. + +Start locust: + +``` +# run locust on a running chartmuseum instance +pipenv run locust --host http://localhost:8080 +``` + +Open your locust console in your browser at http://localhost:8089, and start a new loadtest. diff --git a/loadtesting/locustfile.py b/loadtesting/locustfile.py index f828636..9a5b759 100644 --- a/loadtesting/locustfile.py +++ b/loadtesting/locustfile.py @@ -3,16 +3,11 @@ import tarfile import io patch_version = 1 +chart_post_field_name = 'chart' def index(l): l.client.get("/index.yaml") -def metrics(l): - l.client.get("/metrics") - -def not_found(l): - l.client.get("/toto") - def post_new_chart(l): global patch_version @@ -31,11 +26,11 @@ def post_new_chart(l): t.close() tgz_buf.seek(0) - l.client.post('/api/charts', files={'chartfile': (chart_fn, tgz_buf)}) + l.client.post('/api/charts', files={chart_post_field_name: (chart_fn, tgz_buf)}) class UserBehavior(TaskSet): - tasks = {index: 15, metrics: 1, post_new_chart: 1} + tasks = {index: 10, post_new_chart: 1} class WebsiteUser(HttpLocust): diff --git a/pkg/chartmuseum/handlers.go b/pkg/chartmuseum/handlers.go index 26574a7..06e0e12 100644 --- a/pkg/chartmuseum/handlers.go +++ b/pkg/chartmuseum/handlers.go @@ -30,31 +30,31 @@ type ( ) func (server *Server) getIndexFileRequestHandler(c *gin.Context) { - err := server.syncRepositoryIndex() + index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string)) if err != nil { c.JSON(500, errorResponse(err)) return } - c.Data(200, repo.IndexFileContentType, server.RepositoryIndex.Raw) + c.Data(200, repo.IndexFileContentType, index.Raw) } func (server *Server) getAllChartsRequestHandler(c *gin.Context) { - err := server.syncRepositoryIndex() + index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string)) if err != nil { c.JSON(500, errorResponse(err)) return } - c.JSON(200, server.RepositoryIndex.Entries) + c.JSON(200, index.Entries) } func (server *Server) getChartRequestHandler(c *gin.Context) { name := c.Param("name") - err := server.syncRepositoryIndex() + index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string)) if err != nil { c.JSON(500, errorResponse(err)) return } - chart := server.RepositoryIndex.Entries[name] + chart := index.Entries[name] if chart == nil { c.JSON(404, notFoundErrorResponse) return @@ -68,12 +68,12 @@ func (server *Server) getChartVersionRequestHandler(c *gin.Context) { if version == "latest" { version = "" } - err := server.syncRepositoryIndex() + index, err := server.syncRepositoryIndex(c.MustGet("RequestID").(string)) if err != nil { c.JSON(500, errorResponse(err)) return } - chartVersion, err := server.RepositoryIndex.Get(name, version) + chartVersion, err := index.Get(name, version) if err != nil { c.JSON(404, notFoundErrorResponse) return diff --git a/pkg/chartmuseum/server.go b/pkg/chartmuseum/server.go index 0e01df8..ede9bb1 100644 --- a/pkg/chartmuseum/server.go +++ b/pkg/chartmuseum/server.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "regexp" + "strconv" "sync" + "sync/atomic" "time" "github.com/kubernetes-helm/chartmuseum/pkg/repo" @@ -27,20 +29,30 @@ type ( Router struct { *gin.Engine } - + fetchedObjects struct { + objects []storage.Object + err error + } + indexRegeneration struct { + index *repo.Index + err error + } // Server contains a Logger, Router, storage backend and object cache Server struct { - Logger *Logger - Router *Router - RepositoryIndex *repo.Index - StorageBackend storage.Backend - StorageCache []storage.Object - StorageCacheLock *sync.Mutex - AllowOverwrite bool - TlsCert string - TlsKey string - ChartPostFormFieldName string - ProvPostFormFieldName string + Logger *Logger + Router *Router + RepositoryIndex *repo.Index + StorageBackend storage.Backend + StorageCache []storage.Object + regenerationLock *sync.Mutex + fetchedObjectsLock *sync.Mutex + fetchedObjectsChans []chan fetchedObjects + regeneratedIndexesChans []chan indexRegeneration + AllowOverwrite bool + TlsCert string + TlsKey string + ChartPostFormFieldName string + ProvPostFormFieldName string } // ServerOptions are options for constructing a Server @@ -127,7 +139,8 @@ func NewServer(options ServerOptions) (*Server, error) { RepositoryIndex: repo.NewIndex(options.ChartURL), StorageBackend: options.StorageBackend, StorageCache: []storage.Object{}, - StorageCacheLock: &sync.Mutex{}, + regenerationLock: &sync.Mutex{}, + fetchedObjectsLock: &sync.Mutex{}, AllowOverwrite: options.AllowOverwrite, TlsCert: options.TlsCert, TlsKey: options.TlsKey, @@ -137,7 +150,8 @@ func NewServer(options ServerOptions) (*Server, error) { server.setRoutes(options.EnableAPI) - err = server.regenerateRepositoryIndex() + // prime the cache + _, err = server.syncRepositoryIndex("bootstrap") return server, err } @@ -154,11 +168,15 @@ func (server *Server) Listen(port int) { } func loggingMiddleware(logger *Logger) gin.HandlerFunc { + var requestID int64 return func(c *gin.Context) { + reqID := strconv.FormatInt(atomic.AddInt64(&requestID, 1), 10) + c.Set("RequestID", reqID) + logger.Debugf("(%s) Incoming request: %s", reqID, c.Request.URL.Path) start := time.Now() c.Next() - msg := "Request served" + msg := fmt.Sprintf("(%s) Request served", reqID) status := c.Writer.Status() meta := []interface{}{ @@ -181,22 +199,81 @@ func loggingMiddleware(logger *Logger) gin.HandlerFunc { } } -func (server *Server) syncRepositoryIndex() error { - _, diff, err := server.listObjectsGetDiff() - if err != nil { - return err +// getChartList fetches from the server and accumulates concurrent requests to be fulfilled +// all at once. +func (server *Server) getChartList(reqID string) <-chan fetchedObjects { + c := make(chan fetchedObjects, 1) + + server.fetchedObjectsLock.Lock() + server.fetchedObjectsChans = append(server.fetchedObjectsChans, c) + + if len(server.fetchedObjectsChans) == 1 { + // this unlock is wanted, while fetching the list, allow other channeled requests to be added + server.fetchedObjectsLock.Unlock() + + objects, err := server.fetchChartsInStorage(reqID) + + server.fetchedObjectsLock.Lock() + // flush every other consumer that also wanted the index + for _, foCh := range server.fetchedObjectsChans { + foCh <- fetchedObjects{objects, err} + } + server.fetchedObjectsChans = nil } - if !diff.Change { - return nil - } - err = server.regenerateRepositoryIndex() - return err + server.fetchedObjectsLock.Unlock() + + return c } -func (server *Server) listObjectsGetDiff() ([]storage.Object, storage.ObjectSliceDiff, error) { +func (server *Server) regenerateRepositoryIndex(diff storage.ObjectSliceDiff, storageObjects []storage.Object, reqID string) <-chan indexRegeneration { + c := make(chan indexRegeneration, 1) + + server.regenerationLock.Lock() + server.regeneratedIndexesChans = append(server.regeneratedIndexesChans, c) + + if len(server.regeneratedIndexesChans) == 1 { + server.regenerationLock.Unlock() + + index, err := server.regenerateRepositoryIndexWorker(diff, storageObjects, reqID) + + server.regenerationLock.Lock() + for _, riCh := range server.regeneratedIndexesChans { + riCh <- indexRegeneration{index, err} + } + server.regeneratedIndexesChans = nil + } + server.regenerationLock.Unlock() + + return c +} + +// syncRepositoryIndex is the workhorse of maintaining a coherent index cache. It is optimized for multiple requests +// comming in a short period. When two requests for the backing store arrive, only the first is served, and other consumers receive the +// result of this request. This allows very fast updates in constant time. See getChartList() and regenerateRepositoryIndex(). +func (server *Server) syncRepositoryIndex(reqID string) (*repo.Index, error) { + fo := <-server.getChartList(reqID) + + if fo.err != nil { + return nil, fo.err + } + + diff := storage.GetObjectSliceDiff(server.StorageCache, fo.objects) + + // return fast if no changes + if !diff.Change { + return server.RepositoryIndex, nil + } + + ir := <-server.regenerateRepositoryIndex(diff, fo.objects, reqID) + + return ir.index, ir.err +} + +func (server *Server) fetchChartsInStorage(reqID string) ([]storage.Object, error) { + server.Logger.Debugf("(%s) Fetching chart list from storage", reqID) allObjects, err := server.StorageBackend.ListObjects() if err != nil { - return []storage.Object{}, storage.ObjectSliceDiff{}, err + return []storage.Object{}, err } // filter out storage objects that dont have extension used for chart packages (.tgz) @@ -207,67 +284,57 @@ func (server *Server) listObjectsGetDiff() ([]storage.Object, storage.ObjectSlic } } - diff := storage.GetObjectSliceDiff(server.StorageCache, filteredObjects) - return filteredObjects, diff, nil + return filteredObjects, nil } -func (server *Server) regenerateRepositoryIndex() error { - server.Logger.Debugw("Acquiring storage cache lock") - server.StorageCacheLock.Lock() - server.Logger.Debugw("Storage cache lock acquired") - defer func() { - server.Logger.Debugw("Releasing storage cache lock") - server.StorageCacheLock.Unlock() - }() - - objects, diff, err := server.listObjectsGetDiff() - if err != nil { - return err - } - - index := &repo.Index{ - IndexFile: server.RepositoryIndex.IndexFile, - Raw: server.RepositoryIndex.Raw, - ChartURL: server.RepositoryIndex.ChartURL, - } +func (server *Server) regenerateRepositoryIndexWorker(diff storage.ObjectSliceDiff, storageObjects []storage.Object, reqID string) (*repo.Index, error) { + server.Logger.Debugf("(%s) Regenerating index.yaml", reqID) + index := &repo.Index{ + IndexFile: server.RepositoryIndex.IndexFile, + Raw: server.RepositoryIndex.Raw, + ChartURL: server.RepositoryIndex.ChartURL, + } for _, object := range diff.Removed { - err := server.removeIndexObject(index, object) + err := server.removeIndexObject(index, object, reqID) if err != nil { - return err + return nil, err } } for _, object := range diff.Updated { - err := server.updateIndexObject(index, object) + err := server.updateIndexObject(index, object, reqID) if err != nil { - return err + return nil, err } } - // Parallelize retrieval of added objects to improve startup speed - err = server.addIndexObjectsAsync(index, diff.Added) + // Parallelize retrieval of added objects to improve speed + err := server.addIndexObjectsAsync(index, diff.Added, reqID) if err != nil { - return err + return nil, err } - server.Logger.Debug("Regenerating index.yaml") err = index.Regenerate() if err != nil { - return err + return nil, err } + // It is very important that these two stay in sync as they reflect the same reality. StorageCache serves + // as object modification time cache, and RepositoryIndex is the canonical cached index. server.RepositoryIndex = index - server.StorageCache = objects - return nil + server.StorageCache = storageObjects + + server.Logger.Debugf("(%s) index.yaml regenerated", reqID) + return index, nil } -func (server *Server) removeIndexObject(index *repo.Index, object storage.Object) error { +func (server *Server) removeIndexObject(index *repo.Index, object storage.Object, reqID string) error { chartVersion, err := server.getObjectChartVersion(object, false) if err != nil { return server.checkInvalidChartPackageError(object, err, "removed") } - server.Logger.Debugw("Removing chart from index", + server.Logger.Debugw(fmt.Sprintf("(%s) Removing chart from index", reqID), "name", chartVersion.Name, "version", chartVersion.Version, ) @@ -275,12 +342,12 @@ func (server *Server) removeIndexObject(index *repo.Index, object storage.Object return nil } -func (server *Server) updateIndexObject(index *repo.Index, object storage.Object) error { +func (server *Server) updateIndexObject(index *repo.Index, object storage.Object, reqID string) error { chartVersion, err := server.getObjectChartVersion(object, true) if err != nil { return server.checkInvalidChartPackageError(object, err, "updated") } - server.Logger.Debugw("Updating chart in index", + server.Logger.Debugw(fmt.Sprintf("(%s) Updating chart in index", reqID), "name", chartVersion.Name, "version", chartVersion.Version, ) @@ -288,13 +355,13 @@ func (server *Server) updateIndexObject(index *repo.Index, object storage.Object return nil } -func (server *Server) addIndexObjectsAsync(index *repo.Index, objects []storage.Object) error { +func (server *Server) addIndexObjectsAsync(index *repo.Index, objects []storage.Object, reqID string) error { numObjects := len(objects) if numObjects == 0 { return nil } - server.Logger.Debugw("Loading charts packages from storage (this could take awhile)", + server.Logger.Debugw(fmt.Sprintf("(%s) Loading charts packages from storage (this could take awhile)", reqID), "total", numObjects, ) @@ -335,7 +402,7 @@ func (server *Server) addIndexObjectsAsync(index *repo.Index, objects []storage. if cvRes.cv == nil { continue } - server.Logger.Debugw("Adding chart to index", + server.Logger.Debugw(fmt.Sprintf("(%s) Adding chart to index", reqID), "name", cvRes.cv.Name, "version", cvRes.cv.Version, ) diff --git a/pkg/chartmuseum/server_test.go b/pkg/chartmuseum/server_test.go index b6afcf7..894019c 100644 --- a/pkg/chartmuseum/server_test.go +++ b/pkg/chartmuseum/server_test.go @@ -132,30 +132,41 @@ func (suite *ServerTestSuite) TearDownSuite() { } func (suite *ServerTestSuite) TestRegenerateRepositoryIndex() { - err := suite.Server.regenerateRepositoryIndex() + objects, err := suite.Server.fetchChartsInStorage("test") + diff := storage.GetObjectSliceDiff(suite.Server.StorageCache, objects) + _, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test") suite.Nil(err, "no error regenerating repo index") newtime := time.Now().Add(1 * time.Hour) err = os.Chtimes(suite.TestTarballFilename, newtime, newtime) suite.Nil(err, "no error changing modtime on temp file") - err = suite.Server.regenerateRepositoryIndex() + + objects, err = suite.Server.fetchChartsInStorage("test") + diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects) + _, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test") suite.Nil(err, "no error regenerating repo index with tarball updated") brokenTarballFilename := pathutil.Join(suite.TempDirectory, "brokenchart.tgz") destFile, err := os.Create(brokenTarballFilename) suite.Nil(err, "no error creating new broken tarball in temp dir") defer destFile.Close() - err = suite.Server.regenerateRepositoryIndex() + objects, err = suite.Server.fetchChartsInStorage("test") + diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects) + _, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test") suite.Nil(err, "error not returned with broken tarball added") err = os.Chtimes(brokenTarballFilename, newtime, newtime) suite.Nil(err, "no error changing modtime on broken tarball") - err = suite.Server.regenerateRepositoryIndex() + objects, err = suite.Server.fetchChartsInStorage("test") + diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects) + _, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test") suite.Nil(err, "error not returned with broken tarball updated") err = os.Remove(brokenTarballFilename) suite.Nil(err, "no error removing broken tarball") - err = suite.Server.regenerateRepositoryIndex() + objects, err = suite.Server.fetchChartsInStorage("test") + diff = storage.GetObjectSliceDiff(suite.Server.StorageCache, objects) + _, err = suite.Server.regenerateRepositoryIndexWorker(diff, objects, "test") suite.Nil(err, "error not returned with broken tarball removed") } diff --git a/pkg/repo/index.go b/pkg/repo/index.go index e69360f..c43512b 100644 --- a/pkg/repo/index.go +++ b/pkg/repo/index.go @@ -45,19 +45,16 @@ func (index *Index) Regenerate() error { // RemoveEntry removes a chart version from index func (index *Index) RemoveEntry(chartVersion *helm_repo.ChartVersion) { - for k := range index.Entries { - if k == chartVersion.Name { - for i, cv := range index.Entries[chartVersion.Name] { - if cv.Version == chartVersion.Version { - index.Entries[chartVersion.Name] = append(index.Entries[chartVersion.Name][:i], - index.Entries[chartVersion.Name][i+1:]...) - if len(index.Entries[chartVersion.Name]) == 0 { - delete(index.Entries, chartVersion.Name) - } - break + if entries, ok := index.Entries[chartVersion.Name]; ok { + for i, cv := range entries { + if cv.Version == chartVersion.Version { + index.Entries[chartVersion.Name] = append(entries[:i], + entries[i+1:]...) + if len(index.Entries[chartVersion.Name]) == 0 { + delete(index.Entries, chartVersion.Name) } + break } - break } } } @@ -71,18 +68,27 @@ func (index *Index) AddEntry(chartVersion *helm_repo.ChartVersion) { index.Entries[chartVersion.Name] = append(index.Entries[chartVersion.Name], chartVersion) } +// HasEntry checks if index has already an entry +func (index *Index) HasEntry(chartVersion *helm_repo.ChartVersion) bool { + if entries, ok := index.Entries[chartVersion.Name]; ok { + for _, cv := range entries { + if cv.Version == chartVersion.Version { + return true + } + } + } + return false +} + // UpdateEntry updates a chart version in index func (index *Index) UpdateEntry(chartVersion *helm_repo.ChartVersion) { - for k := range index.Entries { - if k == chartVersion.Name { - for i, cv := range index.Entries[chartVersion.Name] { - if cv.Version == chartVersion.Version { - index.setChartURL(chartVersion) - index.Entries[chartVersion.Name][i] = chartVersion - break - } + if entries, ok := index.Entries[chartVersion.Name]; ok { + for i, cv := range entries { + if cv.Version == chartVersion.Version { + index.setChartURL(chartVersion) + entries[i] = chartVersion + break } - break } } } diff --git a/pkg/repo/index_test.go b/pkg/repo/index_test.go index ad8806d..dc800fd 100644 --- a/pkg/repo/index_test.go +++ b/pkg/repo/index_test.go @@ -65,10 +65,13 @@ func (suite *IndexTestSuite) TestRemove() { for i := 5; i < 10; i++ { chartVersion := getChartVersion(name, i, now) suite.Index.RemoveEntry(chartVersion) + suite.Empty(suite.Index.HasEntry(chartVersion)) } } chartVersion := getChartVersion("d", 0, now) suite.Index.RemoveEntry(chartVersion) + + suite.Empty(suite.Index.HasEntry(chartVersion)) } func (suite *IndexTestSuite) TestChartURLs() { diff --git a/scripts/setup_test_environment.sh b/scripts/setup_test_environment.sh index 863d460..d21f927 100755 --- a/scripts/setup_test_environment.sh +++ b/scripts/setup_test_environment.sh @@ -14,7 +14,9 @@ export PATH="$PWD/testbin:$PATH" export HELM_HOME="$PWD/.helm" main() { - check_env_vars + if [[ $TEST_CLOUD_STORAGE == 1 ]]; then + check_env_vars + fi install_helm package_test_charts }