aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/jobs.scm
blob: 8217d5248fa8bda8c7a6ce43551d79a3eb5696da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
;;; Guix Data Service -- Information about Guix over time
;;; Copyright © 2019 Christopher Baines <mail@cbaines.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program.  If not, see
;;; <http://www.gnu.org/licenses/>.

(define-module (guix-data-service jobs)
  #:use-module (ice-9 match)
  #:use-module (ice-9 format)
  #:use-module (ice-9 threads)
  #:use-module (ice-9 atomic)
  #:use-module (ice-9 textual-ports)
  #:use-module (squee)
  #:use-module (guix-data-service utils)
  #:use-module (guix-data-service database)
  #:use-module (guix-data-service jobs load-new-guix-revision)
  #:export (log-for-job
            count-log-parts
            combine-log-parts!

            process-jobs

            default-max-processes))

(define (log-part-sequence-name job-id)
  (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))

(define (insert-empty-log-entry conn job-id)
  (exec-query
   conn
   "DELETE FROM load_new_guix_revision_job_logs WHERE job_id = $1"
   (list job-id))
  (exec-query
   conn
   "INSERT INTO load_new_guix_revision_job_logs (job_id, contents) VALUES
($1, NULL)"
   (list job-id)))

(define (start-thread-for-process-output job-id)
  (define (insert conn job_id s)
    (exec-query
     conn
     (string-append "
INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
     (list job_id s)))


  (match (pipe)
    ((port-to-read-from . port-to-write-to)

     (setvbuf port-to-read-from 'line)
     (setvbuf port-to-write-to 'line)

     (let ((flags (fcntl port-to-read-from F_GETFL)))
       (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags)))
     (let ((flags (fcntl port-to-write-to F_GETFL)))
       (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags)))

     (call-with-new-thread
      (lambda ()
        (with-postgresql-connection
         (simple-format #f "~A job logging" job-id)
         (lambda (logging-conn)
           (exec-query
            logging-conn
            (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
                           (log-part-sequence-name job-id)))
           (exec-query
            logging-conn
            "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
            (list job-id))

           (insert-empty-log-entry logging-conn job-id)

           (let loop ((line (get-line port-to-read-from)))
             (if (eof-object? line)
                 (simple-format #t "finished reading logs for ~A\n"
                                job-id)
                 (let ((line-with-newline
                        (string-append line "\n")))
                   (catch #t
                     (lambda ()
                       (insert logging-conn job-id line-with-newline)
                       (display line-with-newline))
                     (lambda (key . args)
                       (display
                        (simple-format
                         #f
                         "
error: ~A: ~A
error: could not insert log part: '~A'\n\n"
                         key args line))
                       (catch #t
                         (lambda ()
                           (insert
                            logging-conn
                            job-id
                            (simple-format
                             #f
                             "
guix-data-service: error: missing log line: ~A
\n" key)))
                         (lambda _
                           #t))))
                   (loop (get-line port-to-read-from)))))))))

     port-to-write-to)))

(define (cleanup-logging conn job-id)
  (drop-log-parts-sequence conn job-id)
  (with-time-logging "vacuuming log parts"
    (vacuum-log-parts-table conn)))

(define* (process-jobs conn #:key max-processes
                       latest-branch-revision-max-processes
                       skip-system-tests?
                       per-job-parallelism)
  (define (fetch-new-jobs)
    (fetch-unlocked-jobs conn))

  (define (process-job job-id)
    (let ((log-port (start-thread-for-process-output job-id)))
      (spawn
       "guix-data-service-process-job"
       `("guix-data-service-process-job"
         ,job-id
         ,@(if skip-system-tests?
               '("--skip-system-tests")
               '())
         ,@(if per-job-parallelism
               (list (simple-format #f "--parallelism=~A" per-job-parallelism))
               '()))
       #:output log-port
       #:error log-port)))

  (define (post-job job-id)
    (when (> (count-log-parts conn job-id)
             0)
      (combine-log-parts! conn job-id)
      (cleanup-logging conn job-id)))

  (define (handle-job-failure job-id)
    (record-job-event conn job-id "failure")
    (display (simple-format #f "recording failure for job ~A\n" job-id)
             (current-error-port)))

  (process-jobs-concurrently fetch-new-jobs
                             process-job
                             post-job
                             handle-job-failure
                             #:max-processes max-processes
                             #:priority-max-processes
                             latest-branch-revision-max-processes))


(define* (log-for-job conn job-id
                      #:key
                      character-limit
                      start-character)
  (define (sql-html-escape s)
    (string-append
     "replace("
     (string-append
      "replace("
      (string-append
       "replace("
       s
       ",'&','&amp;')")
      ",'<','&lt;')")
     ",'>','&gt;')"))

  (define (get-characters s)
    (if start-character
        (simple-format #f "substr(~A, ~A, ~A)"
                       s start-character
                       character-limit)
        (simple-format #f "right(~A, ~A)" s character-limit)))

  (define log-query
    (string-append
     "SELECT "
     (sql-html-escape (get-characters "contents"))
     " FROM load_new_guix_revision_job_logs"
     " WHERE job_id = $1 AND contents IS NOT NULL"))

  (define parts-query
    (string-append
     "SELECT "
     (sql-html-escape
      (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)"))
     " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"))

  (match (exec-query conn log-query (list job-id))
    (((contents))
     contents)
    (()
     (match (exec-query conn parts-query (list job-id))
       (((contents))
        contents)))))

(define (count-log-parts conn job-id)
  (match (exec-query
          conn
          "
SELECT COUNT(*)
FROM load_new_guix_revision_job_log_parts
WHERE job_id = $1"
          (list job-id))
    (((id))
     (string->number id))))

(define (combine-log-parts! conn job-id)
  (with-postgresql-transaction
   conn
   (lambda (conn)
     (exec-query
      conn
      (string-append
       "
UPDATE load_new_guix_revision_job_logs SET contents = (
  SELECT STRING_AGG(contents, '' ORDER BY id ASC)
  FROM load_new_guix_revision_job_log_parts
  WHERE job_id = $1
  GROUP BY job_id
)
WHERE job_id = $1")
      (list job-id))
     (exec-query
      conn
      "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
      (list job-id)))))

(define (drop-log-parts-sequence conn job-id)
  (with-postgresql-transaction
   conn
   (lambda (conn)
     (exec-query conn
                 "SET LOCAL lock_timeout = '10s'")
     (with-exception-handler
         (lambda (exn)
           (simple-format (current-error-port)
                          "error when dropping sequence: ~A"
                          exn))
       (lambda ()
         (exec-query conn
                     (string-append
                      "DROP SEQUENCE IF EXISTS "
                      (log-part-sequence-name job-id))))
       #:unwind? #t))))

(define (vacuum-log-parts-table conn)
  (exec-query
   conn
   "VACUUM load_new_guix_revision_job_log_parts"))

(define default-max-processes
  (max (round (/ (current-processor-count)
                 4))
       1))

(define default-timeout
  (* (* 60 60) ;; 1 hour in seconds
     72))

(define* (process-jobs-concurrently
          fetch-new-jobs
          process-job
          post-job
          handle-job-failure
          #:key
          (max-processes default-max-processes)
          (priority-max-processes (* 2 max-processes))
          (timeout default-timeout))

  (define processes
    (make-hash-table))

  (define (display-status)
    (display
     (string-append
      "\n\n"
      (let ((running-jobs (hash-count (const #t) processes)))
        (cond
         ((eq? running-jobs 0)
          "status: 0 running jobs")
         ((eq? running-jobs 1)
          "status: 1 running job")
         (else
          (simple-format #f "status: ~A running jobs"
                         running-jobs))))
      "\n"
      (string-concatenate
       (hash-map->list
        (match-lambda*
          ((pid (start-time job-args))
           (format #f "  pid: ~5d  job args: ~a\n"
                   pid job-args)))
        processes))
      "\n")))

  (define (wait-on-processes)
    (catch
      #t
      (lambda ()
        (match (waitpid WAIT_ANY WNOHANG)
          ((0 . status)
           ;; No process to wait for
           #f)
          ((pid . status)
           (match (hash-ref processes pid)
             ((_ (id))
              (post-job id)
              (unless (eq? status 0)
                (simple-format (current-error-port)
                               "pid ~A (job: ~A) failed with status ~A\n"
                               pid id status)

                (handle-job-failure id))))

           (hashv-remove! processes pid)

           ;; Recurse, to check for other finished processes.
           (wait-on-processes))))
      (lambda (key . args)
        (unless (and (eq? key 'system-error)
                     (match args
                       (("waitpid" "~A" ("No child processes") (10))
                        #t)
                       (_ #f)))
          (simple-format #t "key ~A args ~A\n"
                         key args)))))

  (define (kill-long-running-processes)
    (hash-map->list
     (match-lambda*
       ((pid (start-time job-args))
        (let ((running-for
               (- (current-time) start-time)))
          (when (> running-for timeout)
            (display
             (simple-format
              #f "sending SIGTERM to pid ~A started at ~A, now running for ~A\n"
              pid start-time running-for)
             (current-error-port))
            (kill pid SIGTERM)

            (match job-args
              ((id)
               (handle-job-failure id)))))))
     processes))

  (define (stop-running-processes)
    (hash-map->list
     (match-lambda*
       ((pid (start-time job-args))
        (display
         (simple-format
          #f "sending SIGTERM to pid ~A\n"
          pid)
         (current-error-port))
        (kill pid SIGTERM)))
     processes))

  (define exit?
    (make-atomic-box #f))

  (sigaction SIGTERM
    (lambda args
      (simple-format (current-error-port) "exiting due to SIGTERM\n")
      (atomic-box-set! exit? #t)))

  (while #t
    (kill-long-running-processes)
    (wait-on-processes)
    (display-status)

    (when (atomic-box-ref exit?)
      (stop-running-processes)
      (exit 0))

    (match (fetch-new-jobs)
      (()
       ;; Nothing to do
       #f)
      ((jobs ...)
       (for-each
        (match-lambda
          ((job-id priority?)
           (let ((current-processes
                  (hash-count (const #t) processes)))
             (when (< current-processes
                      (if priority?
                          priority-max-processes
                          max-processes))
               (let ((pid (process-job job-id)))
                 (hashv-set! processes pid
                             (list (current-time) (list job-id))))))))
        jobs)))
    (sleep 15)))