From da1926efc1e1f150f27e0c93d1461e3f1eb8c85a Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 21 Jun 2018 20:40:59 +0100 Subject: Update Que to 1.0.0 (beta 3) --- app/controllers/que_jobs_controller.rb | 8 +- app/models/que_job.rb | 24 +- app/views/shared/_jobs.html.erb | 4 +- config/initializers/que.rb | 2 - ...20180621065525_update_que_to_schema_version4.rb | 9 + db/structure.sql | 312 +++++++++++++++++++-- test/controllers/que_jobs_controller_test.rb | 4 +- 7 files changed, 317 insertions(+), 46 deletions(-) create mode 100644 db/migrate/20180621065525_update_que_to_schema_version4.rb diff --git a/app/controllers/que_jobs_controller.rb b/app/controllers/que_jobs_controller.rb index a458e9d..36d5558 100644 --- a/app/controllers/que_jobs_controller.rb +++ b/app/controllers/que_jobs_controller.rb @@ -20,9 +20,7 @@ class QueJobsController < ApplicationController def cancel - ActiveRecord::Base.connection.execute( - "DELETE FROM que_jobs WHERE job_id = #{job_id}::bigint" - ) + Que.execute :expire_job, [job_id] flash[:success] = 'Job canceled' @@ -32,8 +30,8 @@ class QueJobsController < ApplicationController def retry_now ActiveRecord::Base.connection.execute( "UPDATE que_jobs - SET run_at = now() - WHERE job_id = #{job_id}::bigint" + SET run_at = now() + WHERE id = #{job_id}::bigint" ) flash[:success] = 'Retrying the job now' diff --git a/app/models/que_job.rb b/app/models/que_job.rb index a2227b8..4bddaa0 100644 --- a/app/models/que_job.rb +++ b/app/models/que_job.rb @@ -22,15 +22,21 @@ # # Table name: que_jobs # -# priority :integer default(100), not null -# run_at :datetime not null -# job_id :integer not null -# job_class :text not null -# args :json not null -# error_count :integer default(0), not null -# last_error :text -# queue :text default(""), not null +# priority :integer default(100), not null +# run_at :datetime not null +# id :integer not null, primary key +# job_class :text not null +# error_count :integer default(0), not null +# last_error_message :text +# queue :text default("default"), not null +# last_error_backtrace :text +# finished_at :datetime +# expired_at :datetime +# args :jsonb not null +# data :jsonb not null # -class QueJob < ActiveRecord::Base +require 'que/active_record/model' + +class QueJob < Que::ActiveRecord::Model end diff --git a/app/views/shared/_jobs.html.erb b/app/views/shared/_jobs.html.erb index a151389..73d59ce 100644 --- a/app/views/shared/_jobs.html.erb +++ b/app/views/shared/_jobs.html.erb @@ -42,13 +42,13 @@
<%= job['last_error'] %>
- <%= form_with(url: retry_now_que_job_path(job['job_id'])) do %> + <%= form_with(url: retry_now_que_job_path(job.id)) do %> <%= submit_tag( 'Retry now', class: 'btn btn-primary btn-lg btn-block' ) %> <% end %> - <%= form_with(url: cancel_que_job_path(job['job_id'])) do %> + <%= form_with(url: cancel_que_job_path(job.id)) do %> <%= submit_tag( 'Cancel Job', class: 'btn btn-warning btn-lg btn-block', diff --git a/config/initializers/que.rb b/config/initializers/que.rb index c5f415f..5c2fa03 100644 --- a/config/initializers/que.rb +++ b/config/initializers/que.rb @@ -1,3 +1 @@ require 'que' - -Que.mode = :off diff --git a/db/migrate/20180621065525_update_que_to_schema_version4.rb b/db/migrate/20180621065525_update_que_to_schema_version4.rb new file mode 100644 index 0000000..a4cfc65 --- /dev/null +++ b/db/migrate/20180621065525_update_que_to_schema_version4.rb @@ -0,0 +1,9 @@ +class UpdateQueToSchemaVersion4 < ActiveRecord::Migration[5.1] + def self.up + Que.migrate! version: 4 + end + + def self.down + Que.migrate! version: 3 + end +end diff --git a/db/structure.sql b/db/structure.sql index 36c6abb..31c2571 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -22,10 +22,212 @@ CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; +-- +-- Name: que_validate_tags(jsonb); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_validate_tags(tags_array jsonb) RETURNS boolean + LANGUAGE sql + AS $$ + SELECT bool_and( + jsonb_typeof(value) = 'string' + AND + char_length(value::text) <= 100 + ) + FROM jsonb_array_elements(tags_array) +$$; + + SET default_tablespace = ''; SET default_with_oids = false; +-- +-- Name: que_jobs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.que_jobs ( + priority smallint DEFAULT 100 NOT NULL, + run_at timestamp with time zone DEFAULT now() NOT NULL, + id bigint NOT NULL, + job_class text NOT NULL, + error_count integer DEFAULT 0 NOT NULL, + last_error_message text, + queue text DEFAULT 'default'::text NOT NULL, + last_error_backtrace text, + finished_at timestamp with time zone, + expired_at timestamp with time zone, + args jsonb DEFAULT '[]'::jsonb NOT NULL, + data jsonb DEFAULT '{}'::jsonb NOT NULL, + CONSTRAINT error_length CHECK (((char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000))), + CONSTRAINT job_class_length CHECK ((char_length( +CASE job_class + WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'::text THEN ((args -> 0) ->> 'job_class'::text) + ELSE job_class +END) <= 200)), + CONSTRAINT queue_length CHECK ((char_length(queue) <= 100)), + CONSTRAINT valid_args CHECK ((jsonb_typeof(args) = 'array'::text)), + CONSTRAINT valid_data CHECK (((jsonb_typeof(data) = 'object'::text) AND ((NOT (data ? 'tags'::text)) OR ((jsonb_typeof((data -> 'tags'::text)) = 'array'::text) AND (jsonb_array_length((data -> 'tags'::text)) <= 5) AND public.que_validate_tags((data -> 'tags'::text)))))) +) +WITH (fillfactor='90'); + + +-- +-- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.que_jobs IS '4'; + + +-- +-- Name: que_determine_job_state(public.que_jobs); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_determine_job_state(job public.que_jobs) RETURNS text + LANGUAGE sql + AS $$ + SELECT + CASE + WHEN job.expired_at IS NOT NULL THEN 'expired' + WHEN job.finished_at IS NOT NULL THEN 'finished' + WHEN job.error_count > 0 THEN 'errored' + WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled' + ELSE 'ready' + END +$$; + + +-- +-- Name: que_job_notify(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_job_notify() RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE + locker_pid integer; + sort_key json; + BEGIN + -- Don't do anything if the job is scheduled for a future time. + IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN + RETURN null; + END IF; + + -- Pick a locker to notify of the job's insertion, weighted by their number + -- of workers. Should bounce pseudorandomly between lockers on each + -- invocation, hence the md5-ordering, but still touch each one equally, + -- hence the modulo using the job_id. + SELECT pid + INTO locker_pid + FROM ( + SELECT *, last_value(row_number) OVER () + 1 AS count + FROM ( + SELECT *, row_number() OVER () - 1 AS row_number + FROM ( + SELECT * + FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id + WHERE listening AND queues @> ARRAY[NEW.queue] + ORDER BY md5(pid::text || id::text) + ) t1 + ) t2 + ) t3 + WHERE NEW.id % count = row_number; + + IF locker_pid IS NOT NULL THEN + -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so + -- rather than throw errors when someone enqueues a big job, just + -- broadcast the most pertinent information, and let the locker query for + -- the record after it's taken the lock. The worker will have to hit the + -- DB in order to make sure the job is still visible anyway. + SELECT row_to_json(t) + INTO sort_key + FROM ( + SELECT + 'job_available' AS message_type, + NEW.queue AS queue, + NEW.priority AS priority, + NEW.id AS id, + -- Make sure we output timestamps as UTC ISO 8601 + to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at + ) t; + + PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); + END IF; + + RETURN null; + END +$$; + + +-- +-- Name: que_state_notify(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.que_state_notify() RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE + row record; + message json; + previous_state text; + current_state text; + BEGIN + IF TG_OP = 'INSERT' THEN + previous_state := 'nonexistent'; + current_state := public.que_determine_job_state(NEW); + row := NEW; + ELSIF TG_OP = 'DELETE' THEN + previous_state := public.que_determine_job_state(OLD); + current_state := 'nonexistent'; + row := OLD; + ELSIF TG_OP = 'UPDATE' THEN + previous_state := public.que_determine_job_state(OLD); + current_state := public.que_determine_job_state(NEW); + + -- If the state didn't change, short-circuit. + IF previous_state = current_state THEN + RETURN null; + END IF; + + row := NEW; + ELSE + RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP; + END IF; + + SELECT row_to_json(t) + INTO message + FROM ( + SELECT + 'job_change' AS message_type, + row.id AS id, + row.queue AS queue, + + coalesce(row.data->'tags', '[]'::jsonb) AS tags, + + to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at, + to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time, + + CASE row.job_class + WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN + coalesce( + row.args->0->>'job_class', + 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' + ) + ELSE + row.job_class + END AS job_class, + + previous_state AS previous_state, + current_state AS current_state + ) t; + + PERFORM pg_notify('que_state', message::text); + + RETURN null; + END +$$; + + -- -- Name: ar_internal_metadata; Type: TABLE; Schema: public; Owner: - -- @@ -192,45 +394,51 @@ ALTER SEQUENCE public.mini_environments_id_seq OWNED BY public.mini_environments -- --- Name: que_jobs; Type: TABLE; Schema: public; Owner: - +-- Name: que_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: - -- -CREATE TABLE public.que_jobs ( - priority smallint DEFAULT 100 NOT NULL, - run_at timestamp with time zone DEFAULT now() NOT NULL, - job_id bigint NOT NULL, - job_class text NOT NULL, - args json DEFAULT '[]'::json NOT NULL, - error_count integer DEFAULT 0 NOT NULL, - last_error text, - queue text DEFAULT ''::text NOT NULL -); +CREATE SEQUENCE public.que_jobs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; -- --- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: - +-- Name: que_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - -- -COMMENT ON TABLE public.que_jobs IS '3'; +ALTER SEQUENCE public.que_jobs_id_seq OWNED BY public.que_jobs.id; -- --- Name: que_jobs_job_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- Name: que_lockers; Type: TABLE; Schema: public; Owner: - -- -CREATE SEQUENCE public.que_jobs_job_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; +CREATE UNLOGGED TABLE public.que_lockers ( + pid integer NOT NULL, + worker_count integer NOT NULL, + worker_priorities integer[] NOT NULL, + ruby_pid integer NOT NULL, + ruby_hostname text NOT NULL, + queues text[] NOT NULL, + listening boolean NOT NULL, + CONSTRAINT valid_queues CHECK (((array_ndims(queues) = 1) AND (array_length(queues, 1) IS NOT NULL))), + CONSTRAINT valid_worker_priorities CHECK (((array_ndims(worker_priorities) = 1) AND (array_length(worker_priorities, 1) IS NOT NULL))) +); -- --- Name: que_jobs_job_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- Name: que_values; Type: TABLE; Schema: public; Owner: - -- -ALTER SEQUENCE public.que_jobs_job_id_seq OWNED BY public.que_jobs.job_id; +CREATE TABLE public.que_values ( + key text NOT NULL, + value jsonb DEFAULT '{}'::jsonb NOT NULL, + CONSTRAINT valid_value CHECK ((jsonb_typeof(value) = 'object'::text)) +) +WITH (fillfactor='90'); -- @@ -409,10 +617,10 @@ ALTER TABLE ONLY public.mini_environments ALTER COLUMN id SET DEFAULT nextval('p -- --- Name: que_jobs job_id; Type: DEFAULT; Schema: public; Owner: - +-- Name: que_jobs id; Type: DEFAULT; Schema: public; Owner: - -- -ALTER TABLE ONLY public.que_jobs ALTER COLUMN job_id SET DEFAULT nextval('public.que_jobs_job_id_seq'::regclass); +ALTER TABLE ONLY public.que_jobs ALTER COLUMN id SET DEFAULT nextval('public.que_jobs_id_seq'::regclass); -- @@ -496,7 +704,23 @@ ALTER TABLE ONLY public.mini_environments -- ALTER TABLE ONLY public.que_jobs - ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (queue, priority, run_at, job_id); + ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (id); + + +-- +-- Name: que_lockers que_lockers_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.que_lockers + ADD CONSTRAINT que_lockers_pkey PRIMARY KEY (pid); + + +-- +-- Name: que_values que_values_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.que_values + ADD CONSTRAINT que_values_pkey PRIMARY KEY (key); -- @@ -588,6 +812,41 @@ CREATE INDEX index_mini_environments_on_data_snapshot_id ON public.mini_environm CREATE INDEX index_mini_environments_on_govuk_guix_revision_id ON public.mini_environments USING btree (govuk_guix_revision_id); +-- +-- Name: que_jobs_args_gin_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX que_jobs_args_gin_idx ON public.que_jobs USING gin (args jsonb_path_ops); + + +-- +-- Name: que_jobs_data_gin_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX que_jobs_data_gin_idx ON public.que_jobs USING gin (data jsonb_path_ops); + + +-- +-- Name: que_poll_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX que_poll_idx ON public.que_jobs USING btree (queue, priority, run_at, id) WHERE ((finished_at IS NULL) AND (expired_at IS NULL)); + + +-- +-- Name: que_jobs que_job_notify; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER que_job_notify AFTER INSERT ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_job_notify(); + + +-- +-- Name: que_jobs que_state_notify; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER que_state_notify AFTER INSERT OR DELETE OR UPDATE ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_state_notify(); + + -- -- Name: mini_environments fk_rails_12ab275069; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -660,6 +919,7 @@ INSERT INTO "schema_migrations" (version) VALUES ('20180530192706'), ('20180601153537'), ('20180601182655'), -('20180603120426'); +('20180603120426'), +('20180621065525'); diff --git a/test/controllers/que_jobs_controller_test.rb b/test/controllers/que_jobs_controller_test.rb index 9cab867..b181909 100644 --- a/test/controllers/que_jobs_controller_test.rb +++ b/test/controllers/que_jobs_controller_test.rb @@ -8,7 +8,7 @@ class QueJobsControllerTest < ActionDispatch::IntegrationTest test 'cancel' do job = GovukGuix::FetchRevisionJob.enqueue('test-revision') - post cancel_que_job_path(job.attrs['job_id']) + post cancel_que_job_path(job.que_attrs[:id]) assert_response :redirect end @@ -16,7 +16,7 @@ class QueJobsControllerTest < ActionDispatch::IntegrationTest test 'retry now' do job = GovukGuix::FetchRevisionJob.enqueue('test-revision') - post retry_now_que_job_path(job.attrs['job_id']) + post retry_now_que_job_path(job.que_attrs[:id]) assert_response :redirect end -- cgit v1.2.3