substitute: Cache and reuse connections while substituting.

That way, when fetching a series of substitutes from the same server(s),
the connection is reused instead of being closed/opened for each
substitutes, which saves on network round trips and TLS handshakes.

* guix/http-client.scm (http-fetch): Add #:keep-alive? and honor it.
* guix/progress.scm (progress-report-port): Add #:close? parameter and
honor it.
* guix/scripts/substitute.scm (at-most): Return the tail as a second
value.
(fetch): Add #:port and #:keep-alive? and honor them.
(%max-cached-connections): New variable.
(open-connection-for-uri/cached, call-with-cached-connection): New
procedures.
(with-cached-connection): New macro.
(process-substitution): Wrap 'fetch' call in 'with-cached-connection'.
Pass #:close? to 'progress-report-port'.
This commit is contained in:
Ludovic Courtès 2020-12-02 22:49:39 +01:00
parent 711df9ef3c
commit 5ff521452b
No known key found for this signature in database
GPG key ID: 090B11993D9AEBB5
4 changed files with 116 additions and 34 deletions

View file

@ -1,5 +1,5 @@
;;; GNU Guix --- Functional package management for GNU ;;; GNU Guix --- Functional package management for GNU
;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018 Ludovic Courtès <ludo@gnu.org> ;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2020 Ludovic Courtès <ludo@gnu.org>
;;; Copyright © 2015 Mark H Weaver <mhw@netris.org> ;;; Copyright © 2015 Mark H Weaver <mhw@netris.org>
;;; Copyright © 2012, 2015 Free Software Foundation, Inc. ;;; Copyright © 2012, 2015 Free Software Foundation, Inc.
;;; Copyright © 2017 Tobias Geerinckx-Rice <me@tobias.gr> ;;; Copyright © 2017 Tobias Geerinckx-Rice <me@tobias.gr>
@ -70,6 +70,7 @@ (define-condition-type &http-get-error &error
(define* (http-fetch uri #:key port (text? #f) (buffered? #t) (define* (http-fetch uri #:key port (text? #f) (buffered? #t)
(keep-alive? #f)
(verify-certificate? #t) (verify-certificate? #t)
(headers '((user-agent . "GNU Guile"))) (headers '((user-agent . "GNU Guile")))
timeout) timeout)
@ -79,6 +80,9 @@ (define* (http-fetch uri #:key port (text? #f) (buffered? #t)
unbuffered port, suitable for use in `filtered-port'. HEADERS is an alist of unbuffered port, suitable for use in `filtered-port'. HEADERS is an alist of
extra HTTP headers. extra HTTP headers.
When KEEP-ALIVE? is true, the connection is marked as 'keep-alive' and PORT is
not closed upon completion.
When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates. When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates.
TIMEOUT specifies the timeout in seconds for connection establishment; when TIMEOUT specifies the timeout in seconds for connection establishment; when
@ -104,11 +108,7 @@ (define* (http-fetch uri #:key port (text? #f) (buffered? #t)
(setvbuf port 'none)) (setvbuf port 'none))
(let*-values (((resp data) (let*-values (((resp data)
(http-get uri #:streaming? #t #:port port (http-get uri #:streaming? #t #:port port
;; XXX: When #:keep-alive? is true, if DATA is #:keep-alive? keep-alive?
;; a chunked-encoding port, closing DATA won't
;; close PORT, leading to a file descriptor
;; leak.
#:keep-alive? #f
#:headers headers)) #:headers headers))
((code) ((code)
(response-code resp))) (response-code resp)))

View file

@ -337,9 +337,10 @@ (define buffer
(report total) (report total)
(loop total (get-bytevector-n! in buffer 0 buffer-size)))))))) (loop total (get-bytevector-n! in buffer 0 buffer-size))))))))
(define (progress-report-port reporter port) (define* (progress-report-port reporter port #:key (close? #t))
"Return a port that continuously reports the bytes read from PORT using "Return a port that continuously reports the bytes read from PORT using
REPORTER, which should be a <progress-reporter> object." REPORTER, which should be a <progress-reporter> object. When CLOSE? is true,
PORT is closed when the returned port is closed."
(match reporter (match reporter
(($ <progress-reporter> start report stop) (($ <progress-reporter> start report stop)
(let* ((total 0) (let* ((total 0)
@ -364,5 +365,6 @@ (define (progress-report-port reporter port)
;; trace. ;; trace.
(unless (zero? total) (unless (zero? total)
(stop)) (stop))
(close-port port))))))) (when close?
(close-port port))))))))

View file

@ -188,9 +188,14 @@ (define-syntax-rule (with-timeout duration handler body ...)
(sigaction SIGALRM SIG_DFL) (sigaction SIGALRM SIG_DFL)
(apply values result))))) (apply values result)))))
(define* (fetch uri #:key (buffered? #t) (timeout? #t)) (define* (fetch uri #:key (buffered? #t) (timeout? #t)
(keep-alive? #f) (port #f))
"Return a binary input port to URI and the number of bytes it's expected to "Return a binary input port to URI and the number of bytes it's expected to
provide." provide.
When PORT is true, use it as the underlying I/O port for HTTP transfers; when
PORT is false, open a new connection for URI. When KEEP-ALIVE? is true, the
connection (typically PORT) is kept open once data has been fetched from URI."
(case (uri-scheme uri) (case (uri-scheme uri)
((file) ((file)
(let ((port (open-file (uri-path uri) (let ((port (open-file (uri-path uri)
@ -206,7 +211,7 @@ (define* (fetch uri #:key (buffered? #t) (timeout? #t))
;; sudo tc qdisc add dev eth0 root netem delay 1500ms ;; sudo tc qdisc add dev eth0 root netem delay 1500ms
;; and then cancel with: ;; and then cancel with:
;; sudo tc qdisc del dev eth0 root ;; sudo tc qdisc del dev eth0 root
(let ((port #f)) (let ((port port))
(with-timeout (if timeout? (with-timeout (if timeout?
%fetch-timeout %fetch-timeout
0) 0)
@ -217,10 +222,11 @@ (define* (fetch uri #:key (buffered? #t) (timeout? #t))
(begin (begin
(when (or (not port) (port-closed? port)) (when (or (not port) (port-closed? port))
(set! port (guix:open-connection-for-uri (set! port (guix:open-connection-for-uri
uri #:verify-certificate? #f)) uri #:verify-certificate? #f)))
(unless (or buffered? (not (file-port? port))) (unless (or buffered? (not (file-port? port)))
(setvbuf port 'none))) (setvbuf port 'none))
(http-fetch uri #:text? #f #:port port (http-fetch uri #:text? #f #:port port
#:keep-alive? keep-alive?
#:verify-certificate? #f)))))) #:verify-certificate? #f))))))
(else (else
(leave (G_ "unsupported substitute URI scheme: ~a~%") (leave (G_ "unsupported substitute URI scheme: ~a~%")
@ -478,17 +484,17 @@ (define (narinfo-request cache-url path)
(build-request (string->uri url) #:method 'GET #:headers headers))) (build-request (string->uri url) #:method 'GET #:headers headers)))
(define (at-most max-length lst) (define (at-most max-length lst)
"If LST is shorter than MAX-LENGTH, return it; otherwise return its "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
MAX-LENGTH first elements." return its MAX-LENGTH first elements and its tail."
(let loop ((len 0) (let loop ((len 0)
(lst lst) (lst lst)
(result '())) (result '()))
(match lst (match lst
(() (()
(reverse result)) (values (reverse result) '()))
((head . tail) ((head . tail)
(if (>= len max-length) (if (>= len max-length)
(reverse result) (values (reverse result) lst)
(loop (+ 1 len) tail (cons head result))))))) (loop (+ 1 len) tail (cons head result)))))))
(define* (http-multiple-get base-uri proc seed requests (define* (http-multiple-get base-uri proc seed requests
@ -962,6 +968,68 @@ (define (file-size<? c1 c2)
(((uri compression file-size) _ ...) (((uri compression file-size) _ ...)
(values uri compression file-size)))) (values uri compression file-size))))
(define %max-cached-connections
;; Maximum number of connections kept in cache by
;; 'open-connection-for-uri/cached'.
16)
(define open-connection-for-uri/cached
(let ((cache '()))
(lambda* (uri #:key fresh?)
"Return a connection for URI, possibly reusing a cached connection.
When FRESH? is true, delete any cached connections for URI and open a new
one. Return #f if URI's scheme is 'file' or #f."
(define host (uri-host uri))
(define scheme (uri-scheme uri))
(define key (list host scheme (uri-port uri)))
(and (not (memq scheme '(file #f)))
(match (assoc-ref cache key)
(#f
;; Open a new connection to URI and evict old entries from
;; CACHE, if any.
(let-values (((socket)
(guix:open-connection-for-uri
uri #:verify-certificate? #f))
((new-cache evicted)
(at-most (- %max-cached-connections 1) cache)))
(for-each (match-lambda
((_ . port)
(false-if-exception (close-port port))))
evicted)
(set! cache (alist-cons key socket new-cache))
socket))
(socket
(if (or fresh? (port-closed? socket))
(begin
(false-if-exception (close-port socket))
(set! cache (alist-delete key cache))
(open-connection-for-uri/cached uri))
(begin
;; Drain input left from the previous use.
(drain-input socket)
socket))))))))
(define (call-with-cached-connection uri proc)
(let ((port (open-connection-for-uri/cached uri)))
(catch #t
(lambda ()
(proc port))
(lambda (key . args)
;; If PORT was cached and the server closed the connection in the
;; meantime, we get EPIPE. In that case, open a fresh connection and
;; retry. We might also get 'bad-response or a similar exception from
;; (web response) later on, once we've sent the request.
(if (or (and (eq? key 'system-error)
(= EPIPE (system-error-errno `(,key ,@args))))
(memq key '(bad-response bad-header bad-header-component)))
(proc (open-connection-for-uri/cached uri #:fresh? #t))
(apply throw key args))))))
(define-syntax-rule (with-cached-connection uri port exp ...)
"Bind PORT with EXP... to a socket connected to URI."
(call-with-cached-connection uri (lambda (port) exp ...)))
(define* (process-substitution store-item destination (define* (process-substitution store-item destination
#:key cache-urls acl print-build-trace?) #:key cache-urls acl print-build-trace?)
"Substitute STORE-ITEM (a store file name) from CACHE-URLS, and write it to "Substitute STORE-ITEM (a store file name) from CACHE-URLS, and write it to
@ -984,10 +1052,12 @@ (define narinfo
(G_ "Downloading ~a...~%") (uri->string uri))) (G_ "Downloading ~a...~%") (uri->string uri)))
(let*-values (((raw download-size) (let*-values (((raw download-size)
;; Note that Hydra currently generates Nars on the fly ;; 'guix publish' without '--cache' doesn't specify a
;; and doesn't specify a Content-Length, so ;; Content-Length, so DOWNLOAD-SIZE is #f in this case.
;; DOWNLOAD-SIZE is #f in practice. (with-cached-connection uri port
(fetch uri #:buffered? #f #:timeout? #f)) (fetch uri #:buffered? #f #:timeout? #f
#:port port
#:keep-alive? #t)))
((progress) ((progress)
(let* ((dl-size (or download-size (let* ((dl-size (or download-size
(and (equal? compression "none") (and (equal? compression "none")
@ -1001,7 +1071,9 @@ (define narinfo
(uri->string uri) dl-size (uri->string uri) dl-size
(current-error-port) (current-error-port)
#:abbreviation nar-uri-abbreviation)))) #:abbreviation nar-uri-abbreviation))))
(progress-report-port reporter raw))) ;; Keep RAW open upon completion so we can later reuse
;; the underlying connection.
(progress-report-port reporter raw #:close? #f)))
((input pids) ((input pids)
;; NOTE: This 'progress' port of current process will be ;; NOTE: This 'progress' port of current process will be
;; closed here, while the child process doing the ;; closed here, while the child process doing the
@ -1218,6 +1290,7 @@ (define print-build-trace?
;;; Local Variables: ;;; Local Variables:
;;; eval: (put 'with-timeout 'scheme-indent-function 1) ;;; eval: (put 'with-timeout 'scheme-indent-function 1)
;;; eval: (put 'with-cached-connection 'scheme-indent-function 2)
;;; End: ;;; End:
;;; substitute.scm ends here ;;; substitute.scm ends here

View file

@ -3114,17 +3114,24 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data)
} }
if (fd == substituter->fromAgent.readSide) { if (fd == substituter->fromAgent.readSide) {
/* Trim whitespace to the right. */ /* DATA may consist of several lines. Process them one by one. */
size_t end = data.find_last_not_of(" \t\n"); string input = data;
string trimmed = (end != string::npos) ? data.substr(0, end + 1) : data; while (!input.empty()) {
/* Process up to the first newline. */
size_t end = input.find_first_of("\n");
string trimmed = (end != string::npos) ? input.substr(0, end) : input;
if (expectedHashStr == "") { /* Update the goal's state accordingly. */
expectedHashStr = trimmed; if (expectedHashStr == "") {
} else if (status == "") { expectedHashStr = trimmed;
status = trimmed; } else if (status == "") {
worker.wakeUp(shared_from_this()); status = trimmed;
} else { worker.wakeUp(shared_from_this());
printMsg(lvlError, format("unexpected substituter message '%1%'") % data); } else {
printMsg(lvlError, format("unexpected substituter message '%1%'") % input);
}
input = (end != string::npos) ? input.substr(end + 1) : "";
} }
} }
} }