;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator agent-messaging http) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) #:use-module (rnrs bytevectors) #:use-module (oop goops) #:use-module (logging logger) #:use-module (json) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (web uri) #:use-module (gcrypt base16) #:use-module (gcrypt hash) #:use-module (lzlib) #:use-module ((gnutls) #:select (gnutls-version)) #:use-module (guix progress) #:use-module ((guix config) #:select (%guix-version)) #:use-module ((guix utils) #:select (version>=?)) #:use-module (prometheus) #:use-module (guix store) #:use-module (guix base64) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (make-http-agent-interface fetch-session-credentials submit-status submit-log-file submit-build-result report-build-start report-setup-failure submit-output fetch-builds-for-agent)) (define-class () (coordinator-uri #:init-keyword #:coordinator-uri) (agent-uuid #:init-keyword #:agent-uuid) (password #:init-keyword #:password)) (define (make-http-agent-interface coordinator-uri agent-uuid password) (let* ((gnutls-ver (gnutls-version)) (guix-ver %guix-version)) (simple-format (current-error-port) "(gnutls version: ~A, guix version: ~A)\n" gnutls-ver guix-ver)) (make #:coordinator-uri coordinator-uri #:agent-uuid agent-uuid #:password password)) (define* (coordinator-uri-for-path base-uri-string agent-path #:key query) (let* ((base-uri (string->uri base-uri-string)) (scheme (uri-scheme base-uri)) (host (uri-host base-uri)) (port (uri-port base-uri)) (path (uri-path base-uri))) (build-uri scheme #:host host #:port port #:path (string-append path (if (string-suffix? path "/") agent-path (string-drop agent-path 1))) #:query query))) (define (default-log level . components) (apply log-msg level components)) (define* (coordinator-handle-failed-request log method path response body #:key first-request-failed?) (log 'ERROR "coordinator-http-request: " method " " path " " (response-code response)) (catch #t (lambda () (if (equal? '(application/json (charset . "utf-8")) (response-content-type response)) (json-string->scm (utf8->string body)) (utf8->string body))) (lambda (key . args) (log 'ERROR "error decoding body " key " " args) #f))) (define* (coordinator-http-request log interface path #:key method body (headers '()) succeed-on-access-denied-retry? (retry-times 9)) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append (slot-ref interface 'agent-uuid) ":" (slot-ref interface 'password)))))) (define uri (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) path)) (define first-request-failed? #f) (define (make-request) (let* ((port socket (open-socket-for-uri* uri)) (response body (http-request uri #:port port #:method method #:body (scm->json-string body) #:decode-body? #f #:headers `((Authorization . ,auth-value) ,@headers))) (code (response-code response))) (cond ((= code 400) (and=> (coordinator-handle-failed-request log method path response body) (lambda (error) (raise-exception (make-agent-error-from-coordinator (assoc-ref error "error")))))) ((= code 404) (values (and body (json-string->scm (utf8->string body))) response)) ((>= (response-code response) 400) (let ((body (coordinator-handle-failed-request log method path response body))) (if (and first-request-failed? succeed-on-access-denied-retry? (equal? body '(("error" . "access denied")))) (begin (log 'WARN "treating access denied response as success") (values body response)) (begin (set! first-request-failed? #t) (raise-exception (make-exception-with-message body)))))) (else (values (and body (json-string->scm (utf8->string body))) response))))) (retry-on-error (lambda () (with-port-timeouts make-request)) #:times retry-times #:delay 10 #:no-retry agent-error-from-coordinator?)) (define* (fetch-session-credentials coordinator name token #:key (log default-log)) (define method 'POST) (define path "/agent/fetch-session-credentials") (define uri (coordinator-uri-for-path coordinator path #:query (simple-format #f "name=~A&token=~A" name token))) (let-values (((response body) (http-request uri #:method method #:decode-body? #f))) (let ((code (response-code response))) (cond ((= code 400) (and=> (coordinator-handle-failed-request log method path response body) (lambda (error) (raise-exception (make-agent-error-from-coordinator (assoc-ref error "error")))))) ((>= (response-code response) 400) (let ((body (coordinator-handle-failed-request log method path response body))) (raise-exception (make-exception-with-message body)))) (else (values (json-string->scm (utf8->string body)) response)))))) (define-method (submit-status (interface ) . args) (apply (lambda* (status #:key 1min-load-average system-uptime (log default-log) initial-status-update? (retry-times 1)) (coordinator-http-request log interface (string-append "/agent/" (slot-ref interface 'agent-uuid)) #:method 'PUT ; TODO Should be PATCH #:body `((status . ,status) (processor_count . ,(current-processor-count)) ,@(if 1min-load-average `((load_average . (("1" . ,1min-load-average)))) '()) ,@(if system-uptime `((system_uptime . ,system-uptime)) '()) ,@(if initial-status-update? `((initial_status_update . #t)) '())) #:retry-times retry-times)) args)) (define-method (submit-output (interface ) . args) (apply (lambda* (build-id output-name file #:key (log default-log) reporter-set-bytes-already-sent reporter report-bytes-hashed) (define file-size (stat:size (stat file))) (define file-md5-hash-promise (delay (bytevector->base16-string (file-hash (hash-algorithm md5) file)))) (define (get-partial-upload-bytes) (let-values (((body response) (coordinator-http-request log interface (string-append "/build/" build-id "/output/" output-name "/partial") #:method 'HEAD))) (if (= (response-code response) 404) #f (response-content-length response)))) (define (get-completed-upload-bytes) (let-values (((body response) (coordinator-http-request log interface (string-append "/build/" build-id "/output/" output-name) #:method 'HEAD))) (if (= (response-code response) 404) #f (response-content-length response)))) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append (slot-ref interface 'agent-uuid) ":" (slot-ref interface 'password)))))) (define %force-full-upload #t) (define (perform-upload) (let* ((partial-upload-bytes (get-partial-upload-bytes)) (completed-upload-bytes (get-completed-upload-bytes)) (bytes (if %force-full-upload #f (or partial-upload-bytes completed-upload-bytes))) (upload-uri (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) (string-append "/build/" build-id "/output/" output-name (if (integer? bytes) "/partial" ""))))) ;; Check if the server has all the bytes (if (and bytes (= bytes file-size)) (log 'DEBUG "perform upload: server has all the bytes" " (partial-upload-bytes: " partial-upload-bytes ",completed-upload-bytes: " completed-upload-bytes ")") (if bytes (log 'DEBUG "still more to send (bytes: " file-size ", partial upload bytes: " bytes ")") (begin (log 'DEBUG "starting sending file from start") ;; Set this to enable partial uploads when retrying (set! %force-full-upload #f)))) (reporter-set-bytes-already-sent (or bytes 0)) (let* ((bytes-to-send (if bytes (- file-size bytes) file-size)) (response body (call-with-streaming-http-request upload-uri bytes-to-send (lambda (port) (when (> bytes-to-send 0) (call-with-input-file file (lambda (file-port) (when bytes (seek file-port bytes SEEK_SET) (log 'INFO "resuming upload from byte " bytes)) (dump-port* file-port port #:reporter reporter)) #:binary #t))) #:headers `((Authorization . ,auth-value)) #:method (if bytes 'POST 'PUT) #:streaming? #t))) (log 'DEBUG "finished sending " file) (let loop ((line (get-line body))) (unless (eof-object? line) (let ((bytes (string->number line))) (if (number? bytes) (report-bytes-hashed bytes) (log 'DEBUG "error parsing submit-output response line: " line))) (loop (get-line body)))) (log 'DEBUG "perform upload " file ", response code: " (response-code response)) (when (>= (response-code response) 400) (raise-exception (make-exception-with-message (coordinator-handle-failed-request log 'PUT (uri-path upload-uri) response body))))))) (unless (and=> (get-completed-upload-bytes) (lambda (uploaded-bytes) (= uploaded-bytes file-size))) (retry-on-error (lambda () (with-throw-handler #t perform-upload (lambda _ (backtrace)))) #:times 100 #:delay 60 #:error-hook (lambda _ (log 'DEBUG "perform-upload " file " (bytes: " file-size ", " "md5: " (force file-md5-hash-promise) ")"))))) args)) (define-method (submit-log-file (interface ) . args) (apply (lambda* (build-id file #:key (log default-log)) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append (slot-ref interface 'agent-uuid) ":" (slot-ref interface 'password)))))) (define format (cond ((string-suffix? ".bz2" file) "bzip2") ((string-suffix? ".gz" file) "gzip") (else (error "unsupported log format for" file)))) (define uri (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) (string-append "/build/" build-id "/log/" format))) (retry-on-error (lambda () (let ((response body (call-with-streaming-http-request uri (stat:size (stat file)) (lambda (request-port) (call-with-input-file file (lambda (file-port) (dump-port file-port request-port)) #:binary #t)) #:headers `((Authorization . ,auth-value))))) (if (>= (response-code response) 400) (raise-exception (make-exception-with-message (coordinator-handle-failed-request log 'PUT (uri-path uri) response body))) (begin (log 'INFO "successfully uploaded log file (" (response-code response) ")") #t)))) #:times 12 #:delay (random 15))) args)) (define-method (submit-build-result (interface ) . args) (apply (lambda* (build-id result #:key (log default-log)) (coordinator-http-request log interface (string-append "/build/" build-id) #:method 'PUT ; TODO Should be PATCH #:body result)) args)) (define-method (report-build-start (interface ) . args) (apply (lambda* (build-id #:key (log default-log)) (coordinator-http-request log interface (string-append "/build/" build-id "/report-build-start") #:method 'POST)) args)) (define-method (report-setup-failure (interface ) . args) (apply (lambda* (build-id report #:key (log default-log)) (coordinator-http-request log interface (string-append "/build/" build-id "/report-setup-failure") #:method 'POST #:body report #:succeed-on-access-denied-retry? #t)) args)) (define-method (fetch-builds-for-agent (interface ) . args) (apply (lambda* (systems target-count #:key (log default-log)) (vector->list (assoc-ref (coordinator-http-request log interface (string-append "/agent/" (slot-ref interface 'agent-uuid) "/fetch-builds") #:body `((target_count . ,target-count) (systems . ,(list->vector systems))) #:method 'POST) "builds"))) args))