;;; Guix QA Frontpage
;;;
;;; Copyright © 2021, 2022 Christopher Baines <mail@cbaines.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program.  If not, see
;;; <http://www.gnu.org/licenses/>.

(define-module (guix-qa-frontpage database)
  #:use-module (srfi srfi-1)
  #:use-module (srfi srfi-9)
  #:use-module (srfi srfi-19)
  #:use-module (ice-9 match)
  #:use-module (ice-9 format)
  #:use-module (ice-9 threads)
  #:use-module (ice-9 exceptions)
  #:use-module (web uri)
  #:use-module (sqlite3)
  #:use-module (fibers)
  #:use-module (prometheus)
  #:use-module (guix narinfo)
  #:use-module (guix derivations)
  #:use-module (guix-build-coordinator utils)
  #:use-module (guix-build-coordinator utils fibers)
  #:use-module (guix-qa-frontpage guix-data-service)
  #:export (setup-database

            database-optimize
            database-spawn-fibers

            database-call-with-transaction

            clear-sqlite-cache-entry
            with-sqlite-cache

            insert-into-builds-to-cancel-later
            delete-from-builds-to-cancel-later
            select-from-builds-to-cancel-later

            insert-create-branch-for-issue-log
            select-create-branch-for-issue-log
            delete-create-branch-for-issue-log))

(define-record-type <database>
  (make-database database-file reader-thread-channel writer-thread-channel
                 metrics-registry)
  database?
  (database-file          database-file)
  (reader-thread-channel  database-reader-thread-channel)
  (writer-thread-channel  database-writer-thread-channel)
  (metrics-registry       database-metrics-registry))

(define* (db-open database
                  #:key (write? #t))
  (define flags
    `(,@(if write?
            (list SQLITE_OPEN_READWRITE
                  SQLITE_OPEN_CREATE)

            (list SQLITE_OPEN_READONLY))
      ,SQLITE_OPEN_NOMUTEX))

  (sqlite-open database (apply logior flags)))

(define (perform-initial-database-setup db)
  (define schema
    "
CREATE TABLE cache (
    key TEXT NOT NULL,
    timestamp INTEGER NOT NULL,
    data TEXT NOT NULL
);

CREATE UNIQUE INDEX IF NOT EXISTS cache_idx ON cache (key);

CREATE TABLE IF NOT EXISTS builds_to_cancel_later (
    category_name TEXT NOT NULL,
    category_value TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
    issue TEXT NOT NULL,
    log TEXT NOT NULL
);")

  (sqlite-exec db schema))


(define (update-schema db)
  (let ((statement
         (sqlite-prepare
          db
          "
SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))

    (sqlite-bind-arguments
     statement
     #:name "cache")

    (match (sqlite-step statement)
      (#f (perform-initial-database-setup db))
      (_
       (sqlite-exec
        db
        "
CREATE TABLE IF NOT EXISTS builds_to_cancel_later (
    category_name TEXT NOT NULL,
    category_value TEXT NOT NULL
);

CREATE UNIQUE INDEX IF NOT EXISTS builds_to_cancel_later_unique
  ON builds_to_cancel_later (category_name, category_value);

CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
    issue TEXT NOT NULL,
    log TEXT NOT NULL
);")))

    (sqlite-finalize statement)))

(define (setup-database database-file metrics-registry)
  (let ((db (db-open database-file)))
    (sqlite-exec db "PRAGMA journal_mode=WAL;")
    (sqlite-exec db "PRAGMA optimize;")
    (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")

    (update-schema db)

    (sqlite-close db))

  (let ((reader-thread-channel
         (make-worker-thread-channel
          (lambda ()
            (let ((db
                   (db-open database-file #:write? #f)))
              (sqlite-exec db "PRAGMA busy_timeout = 5000;")
              (list db)))
          #:destructor
          (lambda (db)
            (sqlite-close db))
          #:lifetime 50000
          #:name "db read"

          #:parallelism
          (min (max (current-processor-count)
                    32)
               128)
          #:delay-logger (let ((delay-metric
                                (make-histogram-metric
                                 metrics-registry
                                 "datastore_read_delay_seconds")))
                           (lambda (seconds-delayed)
                             (metric-observe delay-metric
                                             ;; TODO exact->inexact to work around
                                             ;; a bug in guile-prometheus where
                                             ;; the metric sum will output in the
                                             ;; exact form including the /q
                                             (exact->inexact seconds-delayed))
                             (log-delay "datastore read" seconds-delayed)
                             (when (> seconds-delayed 1)
                               (format
                                (current-error-port)
                                "warning: database read delayed by ~1,2f seconds~%"
                                seconds-delayed))))
          #:log-exception?
          (lambda (exn)
            (not (guix-data-service-error? exn)))))
        (writer-thread-channel
         (make-worker-thread-channel
          (lambda ()
            (let ((db
                   (db-open database-file)))
              (sqlite-exec db "PRAGMA busy_timeout = 5000;")
              (sqlite-exec db "PRAGMA foreign_keys = ON;")
              (list db)))
          #:destructor
          (lambda (db)
            (db-optimize db
                         database-file)

            (sqlite-close db))
          #:lifetime 500
          #:name "db write"

          ;; SQLite doesn't support parallel writes
          #:parallelism 1
          #:delay-logger (let ((delay-metric
                                (make-histogram-metric
                                 metrics-registry
                                 "datastore_write_delay_seconds")))
                           (lambda (seconds-delayed)
                             (metric-observe delay-metric
                                             ;; TODO exact->inexact to work around
                                             ;; a bug in guile-prometheus where
                                             ;; the metric sum will output in the
                                             ;; exact form including the /q
                                             (exact->inexact seconds-delayed))
                             (log-delay "datastore write" seconds-delayed)
                             (when (> seconds-delayed 1)
                               (format
                                (current-error-port)
                                "warning: database write delayed by ~1,2f seconds~%"
                                seconds-delayed)))))))

    (make-database database-file
                   reader-thread-channel
                   writer-thread-channel
                   metrics-registry)))

(define (db-optimize db db-filename)
  (define (wal-size)
    (let ((db-wal-filename
           (string-append db-filename "-wal")))

      (stat:size (stat db-wal-filename))))

  (define MiB (* (expt 2 20) 1.))
  (define wal-size-threshold
    (* 5 MiB))

  (when (> (wal-size) wal-size-threshold)
    (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")

    (sqlite-exec db
                 "
PRAGMA analysis_limit=1000;
PRAGMA optimize;")))

(define (database-optimize database)
  (retry-on-error
   (lambda ()
     (call-with-worker-thread
      (database-writer-thread-channel database)
      (lambda (db)
        (db-optimize
         db
         (database-file database)))))
   #:times 5
   #:delay 5))

(define (database-spawn-fibers database)
  (spawn-fiber
   (lambda ()
     (while #t
       (sleep (* 60 5))                 ; 5 minutes
       (with-exception-handler
           (lambda (exn)
             (simple-format (current-error-port)
                            "exception when performing WAL checkpoint: ~A\n"
                            exn))
         (lambda ()
           (database-optimize database))
         #:unwind? #t)))
   #:parallel? #t))

(define (call-with-time-tracking database thing thunk)
  (define registry (database-metrics-registry database))
  (define metric-name
    (string-append "database_" thing "_duration_seconds"))

  (if registry
      (let* ((metric
              (or (metrics-registry-fetch-metric registry metric-name)
                  (make-histogram-metric registry
                                         metric-name)))
             (start-time (get-internal-real-time)))
        (let ((result (thunk)))
          (metric-observe metric
                          (/ (- (get-internal-real-time) start-time)
                             internal-time-units-per-second))
          result))
      (thunk)))

(define %current-transaction-proc
  (make-parameter #f))

(define* (database-call-with-transaction database proc
                                         #:key
                                         readonly?)
  (define (run-proc-within-transaction db)
    (if (%current-transaction-proc)
        (proc db)                       ; already in transaction
        (begin
          (sqlite-exec db "BEGIN TRANSACTION;")
          (with-exception-handler
              (lambda (exn)
                (simple-format (current-error-port)
                               "error: sqlite rolling back transaction\n")
                (sqlite-exec db "ROLLBACK TRANSACTION;")
                (raise-exception exn))
            (lambda ()
              (call-with-values
                  (lambda ()
                    (parameterize ((%current-transaction-proc proc))
                      (proc db)))
                (lambda vals
                  (sqlite-exec db "COMMIT TRANSACTION;")
                  (apply values vals))))
            #:unwind? #t))))

  (match (call-with-worker-thread
          ((if readonly?
               database-reader-thread-channel
               database-writer-thread-channel)
           database)
          (lambda (db)
            (let ((start-time (get-internal-real-time)))
              (call-with-values
                  (lambda ()
                    (run-proc-within-transaction db))
                (lambda vals
                  (let ((duration-seconds
                         (/ (- (get-internal-real-time) start-time)
                            internal-time-units-per-second)))
                    (when (and (not readonly?)
                               (> duration-seconds 2))
                      (display
                       (format
                        #f
                        "warning: ~a:\n  took ~4f seconds in transaction\n"
                        proc
                        duration-seconds)
                       (current-error-port)))

                    (cons duration-seconds vals)))))))
    ((duration vals ...)
     (apply values vals))))

(define (last-insert-rowid db)
  (let ((statement
         (sqlite-prepare
          db
          "SELECT last_insert_rowid();"
          #:cache? #t)))
    (let ((id
           (vector-ref (sqlite-step statement)
                       0)))

      (sqlite-reset statement)

      id)))

(define (changes db)
  (let ((statement
         (sqlite-prepare
          db
          "SELECT changes()"
          #:cache? #t)))
    (let ((id
           (vector-ref (sqlite-step statement)
                       0)))

      (sqlite-reset statement)

      id)))

(define* (clear-sqlite-cache-entry
          database
          key
          #:key (args '()) (version 1))

  (define string-key
    (call-with-output-string
      (lambda (port)
        (write key port)
        (display "|" port)
        (write version port)
        (display ": " port)
        (write args port))))

  (database-call-with-transaction
   database
   (lambda (db)
     (let ((cleanup-statement
            (sqlite-prepare
             db
             "
DELETE FROM cache WHERE key = :key"
             #:cache? #t)))

       (sqlite-bind-arguments
        cleanup-statement
        #:key string-key)
       (sqlite-step cleanup-statement)
       (sqlite-reset cleanup-statement)))))

(define* (with-sqlite-cache
          database
          key
          proc
          #:key (args '())
          (version 1)
          ttl
          (store-computed-value? #t))

  (define string-key
    (call-with-output-string
      (lambda (port)
        (write key port)
        (display "|" port)
        (write version port)
        (display ": " port)
        (write args port))))

  (unless (number? ttl)
    (error "must specify a ttl"))

  (let ((cached-values
         (call-with-worker-thread
          (database-reader-thread-channel database)
          (lambda (db)
            (let ((statement
                   (sqlite-prepare
                    db
                    "
SELECT data, timestamp FROM cache WHERE key = :key"
                    #:cache? #t)))

              (sqlite-bind-arguments
               statement
               #:key string-key)

              (let ((result (sqlite-step statement)))
                (sqlite-reset statement)

                (match result
                  (#f 'noval)
                  (#(data timestamp)
                   (if (<= (+ timestamp ttl)
                           (time-second (current-time)))
                       'noval
                       (call-with-input-string data read))))))))))

    ;; Don't log cache misses for now
    (when (and #f
               (eq? cached-values 'noval))
      (simple-format (current-error-port)
                     "cache miss: ~A\n" string-key))

    (if (eq? cached-values 'noval)
        (call-with-values
            (lambda ()
              (call-with-worker-thread
               (database-reader-thread-channel database)
               (lambda (db)
                 (call-with-delay-logging
                  proc
                  #:args args))))
          (lambda vals
            (when (if (procedure? store-computed-value?)
                      (apply store-computed-value? vals)
                      store-computed-value?)
              (database-call-with-transaction
               database
               (lambda (db)
                 (let ((cleanup-statement
                        (sqlite-prepare
                         db
                         "
DELETE FROM cache WHERE key = :key"
                         #:cache? #t))
                       (insert-statement
                        (sqlite-prepare
                         db
                         "
INSERT INTO cache (key, timestamp, data)
VALUES (:key, :timestamp, :data)"
                         #:cache? #t)))

                   (sqlite-bind-arguments
                    cleanup-statement
                    #:key string-key)
                   (sqlite-step cleanup-statement)
                   (sqlite-reset cleanup-statement)

                   (sqlite-bind-arguments
                    insert-statement
                    #:key string-key
                    #:timestamp (time-second (current-time))
                    #:data (call-with-output-string
                             (lambda (port)
                               (write vals port))))

                   (sqlite-step insert-statement)
                   (sqlite-reset insert-statement)))))

            (apply values vals)))
        (apply values cached-values))))

(define (insert-into-builds-to-cancel-later database category-name
                                            category-value)
  (database-call-with-transaction
   database
   (lambda (db)
     (let ((statement
            (sqlite-prepare
             db
             "
INSERT INTO builds_to_cancel_later (category_name, category_value)
  VALUES (:name, :value)
ON CONFLICT DO NOTHING"
             #:cache? #t)))
       (sqlite-bind-arguments
        statement
        #:name category-name
        #:value category-value)

       (sqlite-step statement)
       (sqlite-reset statement))))
  #t)

(define (delete-from-builds-to-cancel-later database category-name
                                            category-value)
  (database-call-with-transaction
   database
   (lambda (db)
     (let ((statement
            (sqlite-prepare
             db
             "
DELETE FROM builds_to_cancel_later
WHERE category_name = :name AND category_value = :value"
             #:cache? #t)))
       (sqlite-bind-arguments
        statement
        #:name category-name
        #:value category-value)

       (sqlite-step statement)
       (sqlite-reset statement))))
  #t)

(define (select-from-builds-to-cancel-later database category-name)
  (call-with-worker-thread
   (database-reader-thread-channel database)
   (lambda (db)
     (let ((statement
            (sqlite-prepare
             db
             "
SELECT category_value FROM builds_to_cancel_later
WHERE category_name = :name"
             #:cache? #t)))

       (sqlite-bind-arguments
        statement
        #:name category-name)

       (let ((result
              (sqlite-map
               (match-lambda
                 (#(val) val))
               statement)))
         (sqlite-reset statement)

         result)))))

(define (insert-create-branch-for-issue-log database issue log)
  (database-call-with-transaction
   database
   (lambda (db)
     (let ((delete-statement
            (sqlite-prepare
             db
             "
DELETE FROM create_branch_for_issue_logs WHERE issue = :issue"
             #:cache? #t))
           (insert-statement
            (sqlite-prepare
             db
             "
INSERT INTO create_branch_for_issue_logs (issue, log)
VALUES (:issue, :log)"
             #:cache? #t)))

       (sqlite-bind-arguments delete-statement
                              #:issue issue)
       (sqlite-step delete-statement)
       (sqlite-reset delete-statement)

       (sqlite-bind-arguments
        insert-statement
        #:issue issue
        #:log log)

       (sqlite-step insert-statement)
       (sqlite-reset insert-statement)))))

(define (select-create-branch-for-issue-log database issue)
  (call-with-worker-thread
   (database-reader-thread-channel database)
   (lambda (db)
     (let ((statement
            (sqlite-prepare
             db
             "
SELECT log FROM create_branch_for_issue_logs
WHERE issue = :issue"
             #:cache? #t)))

       (sqlite-bind-arguments
        statement
        #:issue issue)

       (let ((result (match (sqlite-step statement)
                       (#(log) log)
                       (#f #f))))
         (sqlite-reset statement)

         result)))))

(define (delete-create-branch-for-issue-log database issue)
  (database-call-with-transaction
   database
   (lambda (db)
     (let ((statement
            (sqlite-prepare
             db
             "
DELETE FROM create_branch_for_issue_logs
WHERE issue = :issue"
             #:cache? #t)))
       (sqlite-bind-arguments
        statement
        #:issue issue)

       (sqlite-step statement)
       (sqlite-reset statement))))
  #t)