aboutsummaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
Diffstat (limited to 'db')
-rw-r--r--db/migrate/20180621065525_update_que_to_schema_version4.rb9
-rw-r--r--db/structure.sql312
2 files changed, 295 insertions, 26 deletions
diff --git a/db/migrate/20180621065525_update_que_to_schema_version4.rb b/db/migrate/20180621065525_update_que_to_schema_version4.rb
new file mode 100644
index 0000000..a4cfc65
--- /dev/null
+++ b/db/migrate/20180621065525_update_que_to_schema_version4.rb
@@ -0,0 +1,9 @@
+class UpdateQueToSchemaVersion4 < ActiveRecord::Migration[5.1]
+ def self.up
+ Que.migrate! version: 4
+ end
+
+ def self.down
+ Que.migrate! version: 3
+ end
+end
diff --git a/db/structure.sql b/db/structure.sql
index 36c6abb..31c2571 100644
--- a/db/structure.sql
+++ b/db/structure.sql
@@ -22,11 +22,213 @@ CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
+--
+-- Name: que_validate_tags(jsonb); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_validate_tags(tags_array jsonb) RETURNS boolean
+ LANGUAGE sql
+ AS $$
+ SELECT bool_and(
+ jsonb_typeof(value) = 'string'
+ AND
+ char_length(value::text) <= 100
+ )
+ FROM jsonb_array_elements(tags_array)
+$$;
+
+
SET default_tablespace = '';
SET default_with_oids = false;
--
+-- Name: que_jobs; Type: TABLE; Schema: public; Owner: -
+--
+
+CREATE TABLE public.que_jobs (
+ priority smallint DEFAULT 100 NOT NULL,
+ run_at timestamp with time zone DEFAULT now() NOT NULL,
+ id bigint NOT NULL,
+ job_class text NOT NULL,
+ error_count integer DEFAULT 0 NOT NULL,
+ last_error_message text,
+ queue text DEFAULT 'default'::text NOT NULL,
+ last_error_backtrace text,
+ finished_at timestamp with time zone,
+ expired_at timestamp with time zone,
+ args jsonb DEFAULT '[]'::jsonb NOT NULL,
+ data jsonb DEFAULT '{}'::jsonb NOT NULL,
+ CONSTRAINT error_length CHECK (((char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000))),
+ CONSTRAINT job_class_length CHECK ((char_length(
+CASE job_class
+ WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'::text THEN ((args -> 0) ->> 'job_class'::text)
+ ELSE job_class
+END) <= 200)),
+ CONSTRAINT queue_length CHECK ((char_length(queue) <= 100)),
+ CONSTRAINT valid_args CHECK ((jsonb_typeof(args) = 'array'::text)),
+ CONSTRAINT valid_data CHECK (((jsonb_typeof(data) = 'object'::text) AND ((NOT (data ? 'tags'::text)) OR ((jsonb_typeof((data -> 'tags'::text)) = 'array'::text) AND (jsonb_array_length((data -> 'tags'::text)) <= 5) AND public.que_validate_tags((data -> 'tags'::text))))))
+)
+WITH (fillfactor='90');
+
+
+--
+-- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: -
+--
+
+COMMENT ON TABLE public.que_jobs IS '4';
+
+
+--
+-- Name: que_determine_job_state(public.que_jobs); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_determine_job_state(job public.que_jobs) RETURNS text
+ LANGUAGE sql
+ AS $$
+ SELECT
+ CASE
+ WHEN job.expired_at IS NOT NULL THEN 'expired'
+ WHEN job.finished_at IS NOT NULL THEN 'finished'
+ WHEN job.error_count > 0 THEN 'errored'
+ WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled'
+ ELSE 'ready'
+ END
+$$;
+
+
+--
+-- Name: que_job_notify(); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_job_notify() RETURNS trigger
+ LANGUAGE plpgsql
+ AS $$
+ DECLARE
+ locker_pid integer;
+ sort_key json;
+ BEGIN
+ -- Don't do anything if the job is scheduled for a future time.
+ IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
+ RETURN null;
+ END IF;
+
+ -- Pick a locker to notify of the job's insertion, weighted by their number
+ -- of workers. Should bounce pseudorandomly between lockers on each
+ -- invocation, hence the md5-ordering, but still touch each one equally,
+ -- hence the modulo using the job_id.
+ SELECT pid
+ INTO locker_pid
+ FROM (
+ SELECT *, last_value(row_number) OVER () + 1 AS count
+ FROM (
+ SELECT *, row_number() OVER () - 1 AS row_number
+ FROM (
+ SELECT *
+ FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
+ WHERE listening AND queues @> ARRAY[NEW.queue]
+ ORDER BY md5(pid::text || id::text)
+ ) t1
+ ) t2
+ ) t3
+ WHERE NEW.id % count = row_number;
+
+ IF locker_pid IS NOT NULL THEN
+ -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
+ -- rather than throw errors when someone enqueues a big job, just
+ -- broadcast the most pertinent information, and let the locker query for
+ -- the record after it's taken the lock. The worker will have to hit the
+ -- DB in order to make sure the job is still visible anyway.
+ SELECT row_to_json(t)
+ INTO sort_key
+ FROM (
+ SELECT
+ 'job_available' AS message_type,
+ NEW.queue AS queue,
+ NEW.priority AS priority,
+ NEW.id AS id,
+ -- Make sure we output timestamps as UTC ISO 8601
+ to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
+ ) t;
+
+ PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
+ END IF;
+
+ RETURN null;
+ END
+$$;
+
+
+--
+-- Name: que_state_notify(); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_state_notify() RETURNS trigger
+ LANGUAGE plpgsql
+ AS $$
+ DECLARE
+ row record;
+ message json;
+ previous_state text;
+ current_state text;
+ BEGIN
+ IF TG_OP = 'INSERT' THEN
+ previous_state := 'nonexistent';
+ current_state := public.que_determine_job_state(NEW);
+ row := NEW;
+ ELSIF TG_OP = 'DELETE' THEN
+ previous_state := public.que_determine_job_state(OLD);
+ current_state := 'nonexistent';
+ row := OLD;
+ ELSIF TG_OP = 'UPDATE' THEN
+ previous_state := public.que_determine_job_state(OLD);
+ current_state := public.que_determine_job_state(NEW);
+
+ -- If the state didn't change, short-circuit.
+ IF previous_state = current_state THEN
+ RETURN null;
+ END IF;
+
+ row := NEW;
+ ELSE
+ RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP;
+ END IF;
+
+ SELECT row_to_json(t)
+ INTO message
+ FROM (
+ SELECT
+ 'job_change' AS message_type,
+ row.id AS id,
+ row.queue AS queue,
+
+ coalesce(row.data->'tags', '[]'::jsonb) AS tags,
+
+ to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at,
+ to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time,
+
+ CASE row.job_class
+ WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN
+ coalesce(
+ row.args->0->>'job_class',
+ 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
+ )
+ ELSE
+ row.job_class
+ END AS job_class,
+
+ previous_state AS previous_state,
+ current_state AS current_state
+ ) t;
+
+ PERFORM pg_notify('que_state', message::text);
+
+ RETURN null;
+ END
+$$;
+
+
+--
-- Name: ar_internal_metadata; Type: TABLE; Schema: public; Owner: -
--
@@ -192,45 +394,51 @@ ALTER SEQUENCE public.mini_environments_id_seq OWNED BY public.mini_environments
--
--- Name: que_jobs; Type: TABLE; Schema: public; Owner: -
+-- Name: que_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
-CREATE TABLE public.que_jobs (
- priority smallint DEFAULT 100 NOT NULL,
- run_at timestamp with time zone DEFAULT now() NOT NULL,
- job_id bigint NOT NULL,
- job_class text NOT NULL,
- args json DEFAULT '[]'::json NOT NULL,
- error_count integer DEFAULT 0 NOT NULL,
- last_error text,
- queue text DEFAULT ''::text NOT NULL
-);
+CREATE SEQUENCE public.que_jobs_id_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
--
--- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: -
+-- Name: que_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
-COMMENT ON TABLE public.que_jobs IS '3';
+ALTER SEQUENCE public.que_jobs_id_seq OWNED BY public.que_jobs.id;
--
--- Name: que_jobs_job_id_seq; Type: SEQUENCE; Schema: public; Owner: -
+-- Name: que_lockers; Type: TABLE; Schema: public; Owner: -
--
-CREATE SEQUENCE public.que_jobs_job_id_seq
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
+CREATE UNLOGGED TABLE public.que_lockers (
+ pid integer NOT NULL,
+ worker_count integer NOT NULL,
+ worker_priorities integer[] NOT NULL,
+ ruby_pid integer NOT NULL,
+ ruby_hostname text NOT NULL,
+ queues text[] NOT NULL,
+ listening boolean NOT NULL,
+ CONSTRAINT valid_queues CHECK (((array_ndims(queues) = 1) AND (array_length(queues, 1) IS NOT NULL))),
+ CONSTRAINT valid_worker_priorities CHECK (((array_ndims(worker_priorities) = 1) AND (array_length(worker_priorities, 1) IS NOT NULL)))
+);
--
--- Name: que_jobs_job_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
+-- Name: que_values; Type: TABLE; Schema: public; Owner: -
--
-ALTER SEQUENCE public.que_jobs_job_id_seq OWNED BY public.que_jobs.job_id;
+CREATE TABLE public.que_values (
+ key text NOT NULL,
+ value jsonb DEFAULT '{}'::jsonb NOT NULL,
+ CONSTRAINT valid_value CHECK ((jsonb_typeof(value) = 'object'::text))
+)
+WITH (fillfactor='90');
--
@@ -409,10 +617,10 @@ ALTER TABLE ONLY public.mini_environments ALTER COLUMN id SET DEFAULT nextval('p
--
--- Name: que_jobs job_id; Type: DEFAULT; Schema: public; Owner: -
+-- Name: que_jobs id; Type: DEFAULT; Schema: public; Owner: -
--
-ALTER TABLE ONLY public.que_jobs ALTER COLUMN job_id SET DEFAULT nextval('public.que_jobs_job_id_seq'::regclass);
+ALTER TABLE ONLY public.que_jobs ALTER COLUMN id SET DEFAULT nextval('public.que_jobs_id_seq'::regclass);
--
@@ -496,7 +704,23 @@ ALTER TABLE ONLY public.mini_environments
--
ALTER TABLE ONLY public.que_jobs
- ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (queue, priority, run_at, job_id);
+ ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (id);
+
+
+--
+-- Name: que_lockers que_lockers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.que_lockers
+ ADD CONSTRAINT que_lockers_pkey PRIMARY KEY (pid);
+
+
+--
+-- Name: que_values que_values_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.que_values
+ ADD CONSTRAINT que_values_pkey PRIMARY KEY (key);
--
@@ -589,6 +813,41 @@ CREATE INDEX index_mini_environments_on_govuk_guix_revision_id ON public.mini_en
--
+-- Name: que_jobs_args_gin_idx; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX que_jobs_args_gin_idx ON public.que_jobs USING gin (args jsonb_path_ops);
+
+
+--
+-- Name: que_jobs_data_gin_idx; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX que_jobs_data_gin_idx ON public.que_jobs USING gin (data jsonb_path_ops);
+
+
+--
+-- Name: que_poll_idx; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX que_poll_idx ON public.que_jobs USING btree (queue, priority, run_at, id) WHERE ((finished_at IS NULL) AND (expired_at IS NULL));
+
+
+--
+-- Name: que_jobs que_job_notify; Type: TRIGGER; Schema: public; Owner: -
+--
+
+CREATE TRIGGER que_job_notify AFTER INSERT ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_job_notify();
+
+
+--
+-- Name: que_jobs que_state_notify; Type: TRIGGER; Schema: public; Owner: -
+--
+
+CREATE TRIGGER que_state_notify AFTER INSERT OR DELETE OR UPDATE ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_state_notify();
+
+
+--
-- Name: mini_environments fk_rails_12ab275069; Type: FK CONSTRAINT; Schema: public; Owner: -
--
@@ -660,6 +919,7 @@ INSERT INTO "schema_migrations" (version) VALUES
('20180530192706'),
('20180601153537'),
('20180601182655'),
-('20180603120426');
+('20180603120426'),
+('20180621065525');