;;; Guix Data Service -- Information about Guix over time ;;; Copyright © 2019 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (guix-data-service jobs) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (squee) #:use-module (guix build syscalls) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service jobs load-new-guix-revision) #:export (log-for-job count-log-parts combine-log-parts! process-jobs default-max-processes)) (define (log-part-sequence-name job-id) (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) (define (insert-empty-log-entry conn job-id) (exec-query conn "DELETE FROM load_new_guix_revision_job_logs WHERE job_id = $1" (list job-id)) (exec-query conn "INSERT INTO load_new_guix_revision_job_logs (job_id, contents) VALUES ($1, NULL)" (list job-id))) (define (start-thread-for-process-output job-id) (define (insert conn job_id s) (exec-query conn (string-append " INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") (list job_id s))) (match (pipe) ((port-to-read-from . port-to-write-to) (setvbuf port-to-read-from 'line) (setvbuf port-to-write-to 'line) (let ((flags (fcntl port-to-read-from F_GETFL))) (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags))) (let ((flags (fcntl port-to-write-to F_GETFL))) (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags))) (call-with-new-thread (lambda () (with-postgresql-connection (simple-format #f "~A job logging" job-id) (lambda (logging-conn) (exec-query logging-conn (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" (log-part-sequence-name job-id))) (exec-query logging-conn "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id)) (insert-empty-log-entry logging-conn job-id) (let loop ((line (get-line port-to-read-from))) (if (eof-object? line) (simple-format #t "finished reading logs for ~A\n" job-id) (let ((line-with-newline (string-append line "\n"))) (catch #t (lambda () (insert logging-conn job-id line-with-newline) (display line-with-newline)) (lambda (key . args) (display (simple-format #f " error: ~A: ~A error: could not insert log part: '~A'\n\n" key args line)) (catch #t (lambda () (insert logging-conn job-id (simple-format #f " guix-data-service: error: missing log line: ~A \n" key))) (lambda _ #t)))) (loop (get-line port-to-read-from))))))))) port-to-write-to))) (define (cleanup-logging conn job-id) (drop-log-parts-sequence conn job-id) (with-time-logging "vacuuming log parts" (vacuum-log-parts-table conn))) (define* (process-jobs conn #:key max-processes latest-branch-revision-max-processes skip-system-tests? per-job-parallelism) (define (fetch-new-jobs) (let ((free-space (free-disk-space "/gnu/store"))) (if (< free-space (* 2 (expt 2 30))) ; 2G (begin (simple-format (current-error-port) "not starting new jobs, low free disk space on /gnu/store (~A)\n" free-space) '()) (fetch-unlocked-jobs conn)))) (define (process-job job-id) (let ((log-port (start-thread-for-process-output job-id))) (spawn "guix-data-service-process-job" `("guix-data-service-process-job" ,job-id ,@(if skip-system-tests? '("--skip-system-tests") '()) ,@(if per-job-parallelism (list (simple-format #f "--parallelism=~A" per-job-parallelism)) '())) #:output log-port #:error log-port))) (define (post-job job-id) (when (> (count-log-parts conn job-id) 0) (combine-log-parts! conn job-id) (cleanup-logging conn job-id))) (define (handle-job-failure job-id) (record-job-event conn job-id "failure") (display (simple-format #f "recording failure for job ~A\n" job-id) (current-error-port))) (process-jobs-concurrently fetch-new-jobs process-job post-job handle-job-failure #:max-processes max-processes #:priority-max-processes latest-branch-revision-max-processes)) (define* (log-for-job conn job-id #:key character-limit start-character) (define (sql-html-escape s) (string-append "replace(" (string-append "replace(" (string-append "replace(" s ",'&','&')") ",'<','<')") ",'>','>')")) (define (get-characters s) (if start-character (simple-format #f "substr(~A, ~A, ~A)" s start-character character-limit) (simple-format #f "right(~A, ~A)" s character-limit))) (define log-query (string-append "SELECT " (sql-html-escape (get-characters "contents")) " FROM load_new_guix_revision_job_logs" " WHERE job_id = $1 AND contents IS NOT NULL")) (define parts-query (string-append "SELECT " (sql-html-escape (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) (match (exec-query conn log-query (list job-id)) (((contents)) contents) (() (match (exec-query conn parts-query (list job-id)) (((contents)) contents))))) (define (count-log-parts conn job-id) (match (exec-query conn " SELECT COUNT(*) FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id)) (((id)) (string->number id)))) (define (combine-log-parts! conn job-id) (with-postgresql-transaction conn (lambda (conn) (exec-query conn (string-append " UPDATE load_new_guix_revision_job_logs SET contents = ( SELECT STRING_AGG(contents, '' ORDER BY id ASC) FROM load_new_guix_revision_job_log_parts WHERE job_id = $1 GROUP BY job_id ) WHERE job_id = $1") (list job-id)) (exec-query conn "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id))))) (define (drop-log-parts-sequence conn job-id) (with-postgresql-transaction conn (lambda (conn) (exec-query conn "SET LOCAL lock_timeout = '10s'") (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error when dropping sequence: ~A" exn)) (lambda () (exec-query conn (string-append "DROP SEQUENCE IF EXISTS " (log-part-sequence-name job-id)))) #:unwind? #t)))) (define (vacuum-log-parts-table conn) (exec-query conn "VACUUM load_new_guix_revision_job_log_parts")) (define default-max-processes (max (round (/ (current-processor-count) 4)) 1)) (define default-timeout (* (* 60 60) ;; 1 hour in seconds 72)) (define* (process-jobs-concurrently fetch-new-jobs process-job post-job handle-job-failure #:key (max-processes default-max-processes) (priority-max-processes (* 2 max-processes)) (timeout default-timeout)) (define processes (make-hash-table)) (define (display-status) (display (string-append "\n\n" (let ((running-jobs (hash-count (const #t) processes))) (cond ((eq? running-jobs 0) "status: 0 running jobs") ((eq? running-jobs 1) "status: 1 running job") (else (simple-format #f "status: ~A running jobs" running-jobs)))) "\n" (string-concatenate (hash-map->list (match-lambda* ((pid (start-time job-args)) (format #f " pid: ~5d job args: ~a\n" pid job-args))) processes)) "\n"))) (define (wait-on-processes) (catch #t (lambda () (match (waitpid WAIT_ANY WNOHANG) ((0 . status) ;; No process to wait for #f) ((pid . status) (match (hash-ref processes pid) ((_ (id)) (post-job id) (unless (eq? status 0) (simple-format (current-error-port) "pid ~A (job: ~A) failed with status ~A\n" pid id status) (handle-job-failure id)))) (hashv-remove! processes pid) ;; Recurse, to check for other finished processes. (wait-on-processes)))) (lambda (key . args) (unless (and (eq? key 'system-error) (match args (("waitpid" "~A" ("No child processes") (10)) #t) (_ #f))) (simple-format #t "key ~A args ~A\n" key args))))) (define (kill-long-running-processes) (hash-map->list (match-lambda* ((pid (start-time job-args)) (let ((running-for (- (current-time) start-time))) (when (> running-for timeout) (display (simple-format #f "sending SIGTERM to pid ~A started at ~A, now running for ~A\n" pid start-time running-for) (current-error-port)) (kill pid SIGTERM) (match job-args ((id) (handle-job-failure id))))))) processes)) (define (stop-running-processes) (hash-map->list (match-lambda* ((pid (start-time job-args)) (display (simple-format #f "sending SIGTERM to pid ~A\n" pid) (current-error-port)) (kill pid SIGTERM))) processes)) (define exit? (make-atomic-box #f)) (sigaction SIGTERM (lambda args (simple-format (current-error-port) "exiting due to SIGTERM\n") (atomic-box-set! exit? #t))) (while #t (kill-long-running-processes) (wait-on-processes) (display-status) (when (atomic-box-ref exit?) (stop-running-processes) (exit 0)) (match (fetch-new-jobs) (() ;; Nothing to do #f) ((jobs ...) (for-each (match-lambda ((job-id priority?) (let ((current-processes (hash-count (const #t) processes))) (when (< current-processes (if priority? priority-max-processes max-processes)) (let ((pid (process-job job-id))) (hashv-set! processes pid (list (current-time) (list job-id)))))))) jobs))) (sleep 15)))