From da1926efc1e1f150f27e0c93d1461e3f1eb8c85a Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 21 Jun 2018 20:40:59 +0100 Subject: Update Que to 1.0.0 (beta 3) --- ...20180621065525_update_que_to_schema_version4.rb | 9 + db/structure.sql | 312 +++++++++++++++++++-- 2 files changed, 295 insertions(+), 26 deletions(-) create mode 100644 db/migrate/20180621065525_update_que_to_schema_version4.rb (limited to 'db') diff --git a/db/migrate/20180621065525_update_que_to_schema_version4.rb b/db/migrate/20180621065525_update_que_to_schema_version4.rb new file mode 100644 index 0000000..a4cfc65 --- /dev/null +++ b/db/migrate/20180621065525_update_que_to_schema_version4.rb @@ -0,0 +1,9 @@ +class UpdateQueToSchemaVersion4 < ActiveRecord::Migration[5.1] + def self.up + Que.migrate! version: 4 + end + + def self.down + Que.migrate! version: 3 + end +end diff --git a/db/structure.sql b/db/structure.sql index 36c6abb..31c2571 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -22,10 +22,212 @@ CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; +-- +-- Name: que_validate_tags(jsonb); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_validate_tags(tags_array jsonb) RETURNS boolean + LANGUAGE sql + AS $$ + SELECT bool_and( + jsonb_typeof(value) = 'string' + AND + char_length(value::text) <= 100 + ) + FROM jsonb_array_elements(tags_array) +$$; + + SET default_tablespace = ''; SET default_with_oids = false; +-- +-- Name: que_jobs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.que_jobs ( + priority smallint DEFAULT 100 NOT NULL, + run_at timestamp with time zone DEFAULT now() NOT NULL, + id bigint NOT NULL, + job_class text NOT NULL, + error_count integer DEFAULT 0 NOT NULL, + last_error_message text, + queue text DEFAULT 'default'::text NOT NULL, + last_error_backtrace text, + finished_at timestamp with time zone, + expired_at timestamp with time zone, + args jsonb DEFAULT '[]'::jsonb NOT NULL, + data jsonb DEFAULT '{}'::jsonb NOT NULL, + CONSTRAINT error_length CHECK (((char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000))), + CONSTRAINT job_class_length CHECK ((char_length( +CASE job_class + WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'::text THEN ((args -> 0) ->> 'job_class'::text) + ELSE job_class +END) <= 200)), + CONSTRAINT queue_length CHECK ((char_length(queue) <= 100)), + CONSTRAINT valid_args CHECK ((jsonb_typeof(args) = 'array'::text)), + CONSTRAINT valid_data CHECK (((jsonb_typeof(data) = 'object'::text) AND ((NOT (data ? 'tags'::text)) OR ((jsonb_typeof((data -> 'tags'::text)) = 'array'::text) AND (jsonb_array_length((data -> 'tags'::text)) <= 5) AND public.que_validate_tags((data -> 'tags'::text)))))) +) +WITH (fillfactor='90'); + + +-- +-- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.que_jobs IS '4'; + + +-- +-- Name: que_determine_job_state(public.que_jobs); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_determine_job_state(job public.que_jobs) RETURNS text + LANGUAGE sql + AS $$ + SELECT + CASE + WHEN job.expired_at IS NOT NULL THEN 'expired' + WHEN job.finished_at IS NOT NULL THEN 'finished' + WHEN job.error_count > 0 THEN 'errored' + WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled' + ELSE 'ready' + END +$$; + + +-- +-- Name: que_job_notify(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_job_notify() RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE + locker_pid integer; + sort_key json; + BEGIN + -- Don't do anything if the job is scheduled for a future time. + IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN + RETURN null; + END IF; + + -- Pick a locker to notify of the job's insertion, weighted by their number + -- of workers. Should bounce pseudorandomly between lockers on each + -- invocation, hence the md5-ordering, but still touch each one equally, + -- hence the modulo using the job_id. + SELECT pid + INTO locker_pid + FROM ( + SELECT *, last_value(row_number) OVER () + 1 AS count + FROM ( + SELECT *, row_number() OVER () - 1 AS row_number + FROM ( + SELECT * + FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id + WHERE listening AND queues @> ARRAY[NEW.queue] + ORDER BY md5(pid::text || id::text) + ) t1 + ) t2 + ) t3 + WHERE NEW.id % count = row_number; + + IF locker_pid IS NOT NULL THEN + -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so + -- rather than throw errors when someone enqueues a big job, just + -- broadcast the most pertinent information, and let the locker query for + -- the record after it's taken the lock. The worker will have to hit the + -- DB in order to make sure the job is still visible anyway. + SELECT row_to_json(t) + INTO sort_key + FROM ( + SELECT + 'job_available' AS message_type, + NEW.queue AS queue, + NEW.priority AS priority, + NEW.id AS id, + -- Make sure we output timestamps as UTC ISO 8601 + to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at + ) t; + + PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); + END IF; + + RETURN null; + END +$$; + + +-- +-- Name: que_state_notify(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_state_notify() RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE + row record; + message json; + previous_state text; + current_state text; + BEGIN + IF TG_OP = 'INSERT' THEN + previous_state := 'nonexistent'; + current_state := public.que_determine_job_state(NEW); + row := NEW; + ELSIF TG_OP = 'DELETE' THEN + previous_state := public.que_determine_job_state(OLD); + current_state := 'nonexistent'; + row := OLD; + ELSIF TG_OP = 'UPDATE' THEN + previous_state := public.que_determine_job_state(OLD); + current_state := public.que_determine_job_state(NEW); + + -- If the state didn't change, short-circuit. + IF previous_state = current_state THEN + RETURN null; + END IF; + + row := NEW; + ELSE + RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP; + END IF; + + SELECT row_to_json(t) + INTO message + FROM ( + SELECT + 'job_change' AS message_type, + row.id AS id, + row.queue AS queue, + + coalesce(row.data->'tags', '[]'::jsonb) AS tags, + + to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at, + to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time, + + CASE row.job_class + WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN + coalesce( + row.args->0->>'job_class', + 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' + ) + ELSE + row.job_class + END AS job_class, + + previous_state AS previous_state, + current_state AS current_state + ) t; + + PERFORM pg_notify('que_state', message::text); + + RETURN null; + END +$$; + + -- -- Name: ar_internal_metadata; Type: TABLE; Schema: public; Owner: - -- @@ -192,45 +394,51 @@ ALTER SEQUENCE public.mini_environments_id_seq OWNED BY public.mini_environments -- --- Name: que_jobs; Type: TABLE; Schema: public; Owner: - +-- Name: que_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: - -- -CREATE TABLE public.que_jobs ( - priority smallint DEFAULT 100 NOT NULL, - run_at timestamp with time zone DEFAULT now() NOT NULL, - job_id bigint NOT NULL, - job_class text NOT NULL, - args json DEFAULT '[]'::json NOT NULL, - error_count integer DEFAULT 0 NOT NULL, - last_error text, - queue text DEFAULT ''::text NOT NULL -); +CREATE SEQUENCE public.que_jobs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; -- --- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: - +-- Name: que_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - -- -COMMENT ON TABLE public.que_jobs IS '3'; +ALTER SEQUENCE public.que_jobs_id_seq OWNED BY public.que_jobs.id; -- --- Name: que_jobs_job_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- Name: que_lockers; Type: TABLE; Schema: public; Owner: - -- -CREATE SEQUENCE public.que_jobs_job_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; +CREATE UNLOGGED TABLE public.que_lockers ( + pid integer NOT NULL, + worker_count integer NOT NULL, + worker_priorities integer[] NOT NULL, + ruby_pid integer NOT NULL, + ruby_hostname text NOT NULL, + queues text[] NOT NULL, + listening boolean NOT NULL, + CONSTRAINT valid_queues CHECK (((array_ndims(queues) = 1) AND (array_length(queues, 1) IS NOT NULL))), + CONSTRAINT valid_worker_priorities CHECK (((array_ndims(worker_priorities) = 1) AND (array_length(worker_priorities, 1) IS NOT NULL))) +); -- --- Name: que_jobs_job_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- Name: que_values; Type: TABLE; Schema: public; Owner: - -- -ALTER SEQUENCE public.que_jobs_job_id_seq OWNED BY public.que_jobs.job_id; +CREATE TABLE public.que_values ( + key text NOT NULL, + value jsonb DEFAULT '{}'::jsonb NOT NULL, + CONSTRAINT valid_value CHECK ((jsonb_typeof(value) = 'object'::text)) +) +WITH (fillfactor='90'); -- @@ -409,10 +617,10 @@ ALTER TABLE ONLY public.mini_environments ALTER COLUMN id SET DEFAULT nextval('p -- --- Name: que_jobs job_id; Type: DEFAULT; Schema: public; Owner: - +-- Name: que_jobs id; Type: DEFAULT; Schema: public; Owner: - -- -ALTER TABLE ONLY public.que_jobs ALTER COLUMN job_id SET DEFAULT nextval('public.que_jobs_job_id_seq'::regclass); +ALTER TABLE ONLY public.que_jobs ALTER COLUMN id SET DEFAULT nextval('public.que_jobs_id_seq'::regclass); -- @@ -496,7 +704,23 @@ ALTER TABLE ONLY public.mini_environments -- ALTER TABLE ONLY public.que_jobs - ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (queue, priority, run_at, job_id); + ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (id); + + +-- +-- Name: que_lockers que_lockers_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.que_lockers + ADD CONSTRAINT que_lockers_pkey PRIMARY KEY (pid); + + +-- +-- Name: que_values que_values_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.que_values + ADD CONSTRAINT que_values_pkey PRIMARY KEY (key); -- @@ -588,6 +812,41 @@ CREATE INDEX index_mini_environments_on_data_snapshot_id ON public.mini_environm CREATE INDEX index_mini_environments_on_govuk_guix_revision_id ON public.mini_environments USING btree (govuk_guix_revision_id); +-- +-- Name: que_jobs_args_gin_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX que_jobs_args_gin_idx ON public.que_jobs USING gin (args jsonb_path_ops); + + +-- +-- Name: que_jobs_data_gin_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX que_jobs_data_gin_idx ON public.que_jobs USING gin (data jsonb_path_ops); + + +-- +-- Name: que_poll_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX que_poll_idx ON public.que_jobs USING btree (queue, priority, run_at, id) WHERE ((finished_at IS NULL) AND (expired_at IS NULL)); + + +-- +-- Name: que_jobs que_job_notify; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER que_job_notify AFTER INSERT ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_job_notify(); + + +-- +-- Name: que_jobs que_state_notify; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER que_state_notify AFTER INSERT OR DELETE OR UPDATE ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_state_notify(); + + -- -- Name: mini_environments fk_rails_12ab275069; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -660,6 +919,7 @@ INSERT INTO "schema_migrations" (version) VALUES ('20180530192706'), ('20180601153537'), ('20180601182655'), -('20180603120426'); +('20180603120426'), +('20180621065525'); -- cgit v1.2.3