From 5ff521452b9ec2aae9ed8e4bb7bdc250a581f203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20Court=C3=A8s?= Date: Wed, 2 Dec 2020 22:49:39 +0100 Subject: [PATCH] substitute: Cache and reuse connections while substituting. That way, when fetching a series of substitutes from the same server(s), the connection is reused instead of being closed/opened for each substitutes, which saves on network round trips and TLS handshakes. * guix/http-client.scm (http-fetch): Add #:keep-alive? and honor it. * guix/progress.scm (progress-report-port): Add #:close? parameter and honor it. * guix/scripts/substitute.scm (at-most): Return the tail as a second value. (fetch): Add #:port and #:keep-alive? and honor them. (%max-cached-connections): New variable. (open-connection-for-uri/cached, call-with-cached-connection): New procedures. (with-cached-connection): New macro. (process-substitution): Wrap 'fetch' call in 'with-cached-connection'. Pass #:close? to 'progress-report-port'. --- guix/http-client.scm | 12 ++--- guix/progress.scm | 8 +-- guix/scripts/substitute.scm | 103 ++++++++++++++++++++++++++++++------ nix/libstore/build.cc | 27 ++++++---- 4 files changed, 116 insertions(+), 34 deletions(-) diff --git a/guix/http-client.scm b/guix/http-client.scm index a767175d67..553640fe9e 100644 --- a/guix/http-client.scm +++ b/guix/http-client.scm @@ -1,5 +1,5 @@ ;;; GNU Guix --- Functional package management for GNU -;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018 Ludovic Courtès +;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2020 Ludovic Courtès ;;; Copyright © 2015 Mark H Weaver ;;; Copyright © 2012, 2015 Free Software Foundation, Inc. ;;; Copyright © 2017 Tobias Geerinckx-Rice @@ -70,6 +70,7 @@ (define-condition-type &http-get-error &error (define* (http-fetch uri #:key port (text? #f) (buffered? #t) + (keep-alive? #f) (verify-certificate? #t) (headers '((user-agent . "GNU Guile"))) timeout) @@ -79,6 +80,9 @@ (define* (http-fetch uri #:key port (text? #f) (buffered? #t) unbuffered port, suitable for use in `filtered-port'. HEADERS is an alist of extra HTTP headers. +When KEEP-ALIVE? is true, the connection is marked as 'keep-alive' and PORT is +not closed upon completion. + When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates. TIMEOUT specifies the timeout in seconds for connection establishment; when @@ -104,11 +108,7 @@ (define* (http-fetch uri #:key port (text? #f) (buffered? #t) (setvbuf port 'none)) (let*-values (((resp data) (http-get uri #:streaming? #t #:port port - ;; XXX: When #:keep-alive? is true, if DATA is - ;; a chunked-encoding port, closing DATA won't - ;; close PORT, leading to a file descriptor - ;; leak. - #:keep-alive? #f + #:keep-alive? keep-alive? #:headers headers)) ((code) (response-code resp))) diff --git a/guix/progress.scm b/guix/progress.scm index fec65b424c..cd80ae620a 100644 --- a/guix/progress.scm +++ b/guix/progress.scm @@ -337,9 +337,10 @@ (define buffer (report total) (loop total (get-bytevector-n! in buffer 0 buffer-size)))))))) -(define (progress-report-port reporter port) +(define* (progress-report-port reporter port #:key (close? #t)) "Return a port that continuously reports the bytes read from PORT using -REPORTER, which should be a object." +REPORTER, which should be a object. When CLOSE? is true, +PORT is closed when the returned port is closed." (match reporter (($ start report stop) (let* ((total 0) @@ -364,5 +365,6 @@ (define (progress-report-port reporter port) ;; trace. (unless (zero? total) (stop)) - (close-port port))))))) + (when close? + (close-port port)))))))) diff --git a/guix/scripts/substitute.scm b/guix/scripts/substitute.scm index 73abd3f029..25075eedff 100755 --- a/guix/scripts/substitute.scm +++ b/guix/scripts/substitute.scm @@ -188,9 +188,14 @@ (define-syntax-rule (with-timeout duration handler body ...) (sigaction SIGALRM SIG_DFL) (apply values result))))) -(define* (fetch uri #:key (buffered? #t) (timeout? #t)) +(define* (fetch uri #:key (buffered? #t) (timeout? #t) + (keep-alive? #f) (port #f)) "Return a binary input port to URI and the number of bytes it's expected to -provide." +provide. + +When PORT is true, use it as the underlying I/O port for HTTP transfers; when +PORT is false, open a new connection for URI. When KEEP-ALIVE? is true, the +connection (typically PORT) is kept open once data has been fetched from URI." (case (uri-scheme uri) ((file) (let ((port (open-file (uri-path uri) @@ -206,7 +211,7 @@ (define* (fetch uri #:key (buffered? #t) (timeout? #t)) ;; sudo tc qdisc add dev eth0 root netem delay 1500ms ;; and then cancel with: ;; sudo tc qdisc del dev eth0 root - (let ((port #f)) + (let ((port port)) (with-timeout (if timeout? %fetch-timeout 0) @@ -217,10 +222,11 @@ (define* (fetch uri #:key (buffered? #t) (timeout? #t)) (begin (when (or (not port) (port-closed? port)) (set! port (guix:open-connection-for-uri - uri #:verify-certificate? #f)) - (unless (or buffered? (not (file-port? port))) - (setvbuf port 'none))) + uri #:verify-certificate? #f))) + (unless (or buffered? (not (file-port? port))) + (setvbuf port 'none)) (http-fetch uri #:text? #f #:port port + #:keep-alive? keep-alive? #:verify-certificate? #f)))))) (else (leave (G_ "unsupported substitute URI scheme: ~a~%") @@ -478,17 +484,17 @@ (define (narinfo-request cache-url path) (build-request (string->uri url) #:method 'GET #:headers headers))) (define (at-most max-length lst) - "If LST is shorter than MAX-LENGTH, return it; otherwise return its -MAX-LENGTH first elements." + "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise +return its MAX-LENGTH first elements and its tail." (let loop ((len 0) (lst lst) (result '())) (match lst (() - (reverse result)) + (values (reverse result) '())) ((head . tail) (if (>= len max-length) - (reverse result) + (values (reverse result) lst) (loop (+ 1 len) tail (cons head result))))))) (define* (http-multiple-get base-uri proc seed requests @@ -962,6 +968,68 @@ (define (file-sizestring uri))) (let*-values (((raw download-size) - ;; Note that Hydra currently generates Nars on the fly - ;; and doesn't specify a Content-Length, so - ;; DOWNLOAD-SIZE is #f in practice. - (fetch uri #:buffered? #f #:timeout? #f)) + ;; 'guix publish' without '--cache' doesn't specify a + ;; Content-Length, so DOWNLOAD-SIZE is #f in this case. + (with-cached-connection uri port + (fetch uri #:buffered? #f #:timeout? #f + #:port port + #:keep-alive? #t))) ((progress) (let* ((dl-size (or download-size (and (equal? compression "none") @@ -1001,7 +1071,9 @@ (define narinfo (uri->string uri) dl-size (current-error-port) #:abbreviation nar-uri-abbreviation)))) - (progress-report-port reporter raw))) + ;; Keep RAW open upon completion so we can later reuse + ;; the underlying connection. + (progress-report-port reporter raw #:close? #f))) ((input pids) ;; NOTE: This 'progress' port of current process will be ;; closed here, while the child process doing the @@ -1218,6 +1290,7 @@ (define print-build-trace? ;;; Local Variables: ;;; eval: (put 'with-timeout 'scheme-indent-function 1) +;;; eval: (put 'with-cached-connection 'scheme-indent-function 2) ;;; End: ;;; substitute.scm ends here diff --git a/nix/libstore/build.cc b/nix/libstore/build.cc index 50d300253d..6cfe7aba7e 100644 --- a/nix/libstore/build.cc +++ b/nix/libstore/build.cc @@ -3114,17 +3114,24 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data) } if (fd == substituter->fromAgent.readSide) { - /* Trim whitespace to the right. */ - size_t end = data.find_last_not_of(" \t\n"); - string trimmed = (end != string::npos) ? data.substr(0, end + 1) : data; + /* DATA may consist of several lines. Process them one by one. */ + string input = data; + while (!input.empty()) { + /* Process up to the first newline. */ + size_t end = input.find_first_of("\n"); + string trimmed = (end != string::npos) ? input.substr(0, end) : input; - if (expectedHashStr == "") { - expectedHashStr = trimmed; - } else if (status == "") { - status = trimmed; - worker.wakeUp(shared_from_this()); - } else { - printMsg(lvlError, format("unexpected substituter message '%1%'") % data); + /* Update the goal's state accordingly. */ + if (expectedHashStr == "") { + expectedHashStr = trimmed; + } else if (status == "") { + status = trimmed; + worker.wakeUp(shared_from_this()); + } else { + printMsg(lvlError, format("unexpected substituter message '%1%'") % input); + } + + input = (end != string::npos) ? input.substr(end + 1) : ""; } } }