;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator agent-messaging http) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) #:use-module (rnrs bytevectors) #:use-module (oop goops) #:use-module (logging logger) #:use-module (json) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (web uri) #:use-module (gcrypt base16) #:use-module (gcrypt hash) #:use-module (lzlib) #:use-module ((gnutls) #:select (gnutls-version)) #:use-module ((guix config) #:select (%guix-version)) #:use-module ((guix utils) #:select (version>=?)) #:use-module (prometheus) #:use-module (guix store) #:use-module (guix base64) #:use-module (guix serialization) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (make-http-agent-interface fetch-session-credentials submit-status submit-log-file submit-build-result report-build-start report-setup-failure submit-output fetch-builds-for-agent)) (define-class () (coordinator-uri #:init-keyword #:coordinator-uri) (agent-uuid #:init-keyword #:agent-uuid) (password #:init-keyword #:password)) (define (make-http-agent-interface coordinator-uri agent-uuid password) (let* ((gnutls-ver (gnutls-version)) (guix-ver %guix-version) (gnutls-probably-retries-on-gc? (or (version>=? gnutls-ver "3.7.3") ;; guix patched gnutls to retry when interrupted by gc before ;; gnutls released the change (version>=? guix-ver "1.3.0-14")))) (simple-format (current-error-port) "gc protection ~A (gnutls version: ~A, guix version: ~A)\n" (if gnutls-probably-retries-on-gc? "disabled" "enabled") gnutls-ver guix-ver) (use-gc-protection? (not gnutls-probably-retries-on-gc?))) (make #:coordinator-uri coordinator-uri #:agent-uuid agent-uuid #:password password)) (define* (coordinator-uri-for-path base-uri-string agent-path #:key query) (let* ((base-uri (string->uri base-uri-string)) (scheme (uri-scheme base-uri)) (host (uri-host base-uri)) (port (uri-port base-uri)) (path (uri-path base-uri))) (build-uri scheme #:host host #:port port #:path (string-append path (if (string-suffix? path "/") agent-path (string-drop agent-path 1))) #:query query))) (define (default-log level . components) (apply log-msg level components)) (define* (coordinator-handle-failed-request log method path response body #:key first-request-failed?) (log 'ERROR "coordinator-http-request: " method " " path " " (response-code response)) (catch #t (lambda () (if (equal? '(application/json (charset . "utf-8")) (response-content-type response)) (json-string->scm (utf8->string body)) (utf8->string body))) (lambda (key . args) (log 'ERROR "error decoding body " key " " args) #f))) (define* (coordinator-http-request log interface path #:key method body (headers '()) succeed-on-access-denied-retry?) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append (slot-ref interface 'agent-uuid) ":" (slot-ref interface 'password)))))) (define uri (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) path)) (define first-request-failed? #f) (define (make-request) (let-values (((response body) (http-request uri #:method method #:body (scm->json-string body) #:decode-body? #f #:headers `((Authorization . ,auth-value) ,@headers)))) (let ((code (response-code response))) (cond ((eq? code 400) (and=> (coordinator-handle-failed-request log method path response body) (lambda (error) (raise-exception (make-agent-error-from-coordinator (assoc-ref error "error")))))) ((eq? code 404) (values (and body (json-string->scm (utf8->string body))) response)) ((>= (response-code response) 400) (let ((body (coordinator-handle-failed-request log method path response body))) (if (and first-request-failed? succeed-on-access-denied-retry? (equal? body '(("error" . "access denied")))) (begin (log 'WARN "treating access denied response as success") (values body response)) (begin (set! first-request-failed? #t) (raise-exception (make-exception-with-message body)))))) (else (values (and body (json-string->scm (utf8->string body))) response)))))) (retry-on-error (lambda () (with-port-timeouts (lambda () (with-gc-protection make-request)))) #:times 9 #:delay 10 #:ignore agent-error-from-coordinator?)) (define* (fetch-session-credentials coordinator name token #:key (log default-log)) (define method 'POST) (define path "/agent/fetch-session-credentials") (define uri (coordinator-uri-for-path coordinator path #:query (simple-format #f "name=~A&token=~A" name token))) (let-values (((response body) (http-request uri #:method method #:decode-body? #f))) (let ((code (response-code response))) (cond ((eq? code 400) (and=> (coordinator-handle-failed-request log method path response body) (lambda (error) (raise-exception (make-agent-error-from-coordinator (assoc-ref error "error")))))) ((>= (response-code response) 400) (let ((body (coordinator-handle-failed-request log method path response body))) (raise-exception (make-exception-with-message body)))) (else (values (json-string->scm (utf8->string body)) response)))))) (define-method (submit-status (interface ) . args) (apply (lambda* (status #:key (log default-log)) (coordinator-http-request log interface (string-append "/agent/" (slot-ref interface 'agent-uuid)) #:method 'PUT ; TODO Should be PATCH #:body `((status . ,status)))) args)) (define-method (submit-output (interface ) . args) (apply (lambda* (build-id output-name file #:key (log default-log) report-bytes-sent) (define file-size (stat:size (stat file))) (define file-md5-hash-promise (delay (bytevector->base16-string (file-hash (hash-algorithm md5) file)))) (define (get-partial-upload-bytes) (let-values (((body response) (coordinator-http-request log interface (string-append "/build/" build-id "/output/" output-name "/partial") #:method 'HEAD))) (if (eq? (response-code response) 404) #f (response-content-length response)))) (define (get-completed-upload-bytes) (let-values (((body response) (coordinator-http-request log interface (string-append "/build/" build-id "/output/" output-name) #:method 'HEAD))) (if (eq? (response-code response) 404) #f (response-content-length response)))) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append (slot-ref interface 'agent-uuid) ":" (slot-ref interface 'password)))))) (define %force-full-upload #t) (define (perform-upload) (define reset-timeout/throttled (throttle 120 (lambda () (reset-timeout (* 5 60))))) (let* ((partial-upload-bytes (get-partial-upload-bytes)) (completed-upload-bytes (get-completed-upload-bytes)) (bytes (if %force-full-upload #f (or partial-upload-bytes completed-upload-bytes)))) ;; Check if the server has all the bytes (if (and bytes (eq? bytes file-size)) (begin (log 'DEBUG "perform upload: server has all the bytes" " (partial-upload-bytes: " partial-upload-bytes ",completed-upload-bytes: " completed-upload-bytes ")") (unless completed-upload-bytes (let loop ((retry 1)) (sleep 30) (if (get-completed-upload-bytes) (log 'DEBUG "upload completed") (if (< retry 30) (begin (log 'DEBUG "upload not yet completed, " "rechecking in 30 seconds") (loop (+ 1 retry))) (begin (log 'DEBUG "upload not yet completed, giving up waiting") (raise-exception (make-exception-with-message "upload not completed")))))))) (begin (if bytes (log 'DEBUG "still more to send (bytes: " file-size ", partial upload bytes: " bytes ")") (begin (log 'DEBUG "starting sending file from start") (set! %force-full-upload #f) (report-bytes-sent 'reset))) ;; Still more to send (call-with-input-file file (lambda (file-port) (when bytes (seek file-port bytes SEEK_SET) (log 'INFO "resuming upload from byte " bytes)) (let ((upload-uri (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) (string-append "/build/" build-id "/output/" output-name (if (integer? bytes) "/partial" ""))))) (with-timeout (* 5 60) ; 5 minutes (raise-exception (make-exception-with-message "timeout submitting output")) (let-values (((response body) (call-with-streaming-http-request upload-uri (lambda (port) (with-time-logging (simple-format #f "sending ~A" file) (dump-port file-port port #:buffer-size 65536))) #:headers `((Authorization . ,auth-value)) #:method (if bytes 'POST 'PUT) #:report-bytes-sent (lambda args (reset-timeout/throttled) (apply report-bytes-sent args))))) (log 'DEBUG "perform upload " file ", response code: " (response-code response)) (when (>= (response-code response) 400) (raise-exception (make-exception-with-message (coordinator-handle-failed-request log 'PUT (uri-path upload-uri) response body))))))))))))) (unless (and=> (get-completed-upload-bytes) (lambda (uploaded-bytes) (eq? uploaded-bytes file-size))) (retry-on-error perform-upload #:times 100 #:delay 40 #:error-hook (lambda _ (log 'DEBUG "perform-upload " file " (bytes: " file-size ", " "md5: " (force file-md5-hash-promise) ")"))))) args)) (define-method (submit-log-file (interface ) . args) (apply (lambda* (build-id file #:key (log default-log)) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append (slot-ref interface 'agent-uuid) ":" (slot-ref interface 'password)))))) (define format (cond ((string-suffix? ".bz2" file) "bzip2") ((string-suffix? ".gz" file) "gzip") (else (error "unsupported log format for" file)))) (define uri (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) (string-append "/build/" build-id "/log/" format))) (retry-on-error (lambda () (let-values (((response body) (call-with-streaming-http-request uri (lambda (request-port) (call-with-input-file file (lambda (file-port) (dump-port file-port request-port #:buffer-size (expt 2 20))) #:binary #t)) #:headers `((Authorization . ,auth-value))))) (if (>= (response-code response) 400) (raise-exception (make-exception-with-message (coordinator-handle-failed-request log 'PUT (uri-path uri) response body))) (begin (log 'INFO "successfully uploaded log file (" (response-code response) ")") #t)))) #:times 12 #:delay (random 15))) args)) (define-method (submit-build-result (interface ) . args) (apply (lambda* (build-id result #:key (log default-log)) (coordinator-http-request log interface (string-append "/build/" build-id) #:method 'PUT ; TODO Should be PATCH #:body result)) args)) (define-method (report-build-start (interface ) . args) (apply (lambda* (build-id #:key (log default-log)) (coordinator-http-request log interface (string-append "/build/" build-id "/report-build-start") #:method 'POST)) args)) (define-method (report-setup-failure (interface ) . args) (apply (lambda* (build-id report #:key (log default-log)) (coordinator-http-request log interface (string-append "/build/" build-id "/report-setup-failure") #:method 'POST #:body report #:succeed-on-access-denied-retry? #t)) args)) (define-method (fetch-builds-for-agent (interface ) . args) (apply (lambda* (systems target-count #:key (log default-log)) (vector->list (assoc-ref (coordinator-http-request log interface (string-append "/agent/" (slot-ref interface 'agent-uuid) "/fetch-builds") #:body `((target_count . ,target-count) (systems . ,(list->vector systems))) #:method 'POST) "builds"))) args))