aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2018-06-21 20:40:59 +0100
committerChristopher Baines <mail@cbaines.net>2018-06-21 20:40:59 +0100
commitda1926efc1e1f150f27e0c93d1461e3f1eb8c85a (patch)
treeba043a349e337113f2754642a9b78b020b547762
parent7dbc313632ecebd638bd464bdc430292218d4c9e (diff)
downloadgovuk-mini-environment-admin-da1926efc1e1f150f27e0c93d1461e3f1eb8c85a.tar
govuk-mini-environment-admin-da1926efc1e1f150f27e0c93d1461e3f1eb8c85a.tar.gz
Update Que to 1.0.0 (beta 3)
-rw-r--r--app/controllers/que_jobs_controller.rb8
-rw-r--r--app/models/que_job.rb24
-rw-r--r--app/views/shared/_jobs.html.erb4
-rw-r--r--config/initializers/que.rb2
-rw-r--r--db/migrate/20180621065525_update_que_to_schema_version4.rb9
-rw-r--r--db/structure.sql312
-rw-r--r--test/controllers/que_jobs_controller_test.rb4
7 files changed, 317 insertions, 46 deletions
diff --git a/app/controllers/que_jobs_controller.rb b/app/controllers/que_jobs_controller.rb
index a458e9d..36d5558 100644
--- a/app/controllers/que_jobs_controller.rb
+++ b/app/controllers/que_jobs_controller.rb
@@ -20,9 +20,7 @@
class QueJobsController < ApplicationController
def cancel
- ActiveRecord::Base.connection.execute(
- "DELETE FROM que_jobs WHERE job_id = #{job_id}::bigint"
- )
+ Que.execute :expire_job, [job_id]
flash[:success] = 'Job canceled'
@@ -32,8 +30,8 @@ class QueJobsController < ApplicationController
def retry_now
ActiveRecord::Base.connection.execute(
"UPDATE que_jobs
- SET run_at = now()
- WHERE job_id = #{job_id}::bigint"
+ SET run_at = now()
+ WHERE id = #{job_id}::bigint"
)
flash[:success] = 'Retrying the job now'
diff --git a/app/models/que_job.rb b/app/models/que_job.rb
index a2227b8..4bddaa0 100644
--- a/app/models/que_job.rb
+++ b/app/models/que_job.rb
@@ -22,15 +22,21 @@
#
# Table name: que_jobs
#
-# priority :integer default(100), not null
-# run_at :datetime not null
-# job_id :integer not null
-# job_class :text not null
-# args :json not null
-# error_count :integer default(0), not null
-# last_error :text
-# queue :text default(""), not null
+# priority :integer default(100), not null
+# run_at :datetime not null
+# id :integer not null, primary key
+# job_class :text not null
+# error_count :integer default(0), not null
+# last_error_message :text
+# queue :text default("default"), not null
+# last_error_backtrace :text
+# finished_at :datetime
+# expired_at :datetime
+# args :jsonb not null
+# data :jsonb not null
#
-class QueJob < ActiveRecord::Base
+require 'que/active_record/model'
+
+class QueJob < Que::ActiveRecord::Model
end
diff --git a/app/views/shared/_jobs.html.erb b/app/views/shared/_jobs.html.erb
index a151389..73d59ce 100644
--- a/app/views/shared/_jobs.html.erb
+++ b/app/views/shared/_jobs.html.erb
@@ -42,13 +42,13 @@
<pre><%= job['last_error'] %></pre>
</div>
<div class="col-md-2">
- <%= form_with(url: retry_now_que_job_path(job['job_id'])) do %>
+ <%= form_with(url: retry_now_que_job_path(job.id)) do %>
<%= submit_tag(
'Retry now',
class: 'btn btn-primary btn-lg btn-block'
) %>
<% end %>
- <%= form_with(url: cancel_que_job_path(job['job_id'])) do %>
+ <%= form_with(url: cancel_que_job_path(job.id)) do %>
<%= submit_tag(
'Cancel Job',
class: 'btn btn-warning btn-lg btn-block',
diff --git a/config/initializers/que.rb b/config/initializers/que.rb
index c5f415f..5c2fa03 100644
--- a/config/initializers/que.rb
+++ b/config/initializers/que.rb
@@ -1,3 +1 @@
require 'que'
-
-Que.mode = :off
diff --git a/db/migrate/20180621065525_update_que_to_schema_version4.rb b/db/migrate/20180621065525_update_que_to_schema_version4.rb
new file mode 100644
index 0000000..a4cfc65
--- /dev/null
+++ b/db/migrate/20180621065525_update_que_to_schema_version4.rb
@@ -0,0 +1,9 @@
+class UpdateQueToSchemaVersion4 < ActiveRecord::Migration[5.1]
+ def self.up
+ Que.migrate! version: 4
+ end
+
+ def self.down
+ Que.migrate! version: 3
+ end
+end
diff --git a/db/structure.sql b/db/structure.sql
index 36c6abb..31c2571 100644
--- a/db/structure.sql
+++ b/db/structure.sql
@@ -22,11 +22,213 @@ CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
+--
+-- Name: que_validate_tags(jsonb); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_validate_tags(tags_array jsonb) RETURNS boolean
+ LANGUAGE sql
+ AS $$
+ SELECT bool_and(
+ jsonb_typeof(value) = 'string'
+ AND
+ char_length(value::text) <= 100
+ )
+ FROM jsonb_array_elements(tags_array)
+$$;
+
+
SET default_tablespace = '';
SET default_with_oids = false;
--
+-- Name: que_jobs; Type: TABLE; Schema: public; Owner: -
+--
+
+CREATE TABLE public.que_jobs (
+ priority smallint DEFAULT 100 NOT NULL,
+ run_at timestamp with time zone DEFAULT now() NOT NULL,
+ id bigint NOT NULL,
+ job_class text NOT NULL,
+ error_count integer DEFAULT 0 NOT NULL,
+ last_error_message text,
+ queue text DEFAULT 'default'::text NOT NULL,
+ last_error_backtrace text,
+ finished_at timestamp with time zone,
+ expired_at timestamp with time zone,
+ args jsonb DEFAULT '[]'::jsonb NOT NULL,
+ data jsonb DEFAULT '{}'::jsonb NOT NULL,
+ CONSTRAINT error_length CHECK (((char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000))),
+ CONSTRAINT job_class_length CHECK ((char_length(
+CASE job_class
+ WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'::text THEN ((args -> 0) ->> 'job_class'::text)
+ ELSE job_class
+END) <= 200)),
+ CONSTRAINT queue_length CHECK ((char_length(queue) <= 100)),
+ CONSTRAINT valid_args CHECK ((jsonb_typeof(args) = 'array'::text)),
+ CONSTRAINT valid_data CHECK (((jsonb_typeof(data) = 'object'::text) AND ((NOT (data ? 'tags'::text)) OR ((jsonb_typeof((data -> 'tags'::text)) = 'array'::text) AND (jsonb_array_length((data -> 'tags'::text)) <= 5) AND public.que_validate_tags((data -> 'tags'::text))))))
+)
+WITH (fillfactor='90');
+
+
+--
+-- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: -
+--
+
+COMMENT ON TABLE public.que_jobs IS '4';
+
+
+--
+-- Name: que_determine_job_state(public.que_jobs); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_determine_job_state(job public.que_jobs) RETURNS text
+ LANGUAGE sql
+ AS $$
+ SELECT
+ CASE
+ WHEN job.expired_at IS NOT NULL THEN 'expired'
+ WHEN job.finished_at IS NOT NULL THEN 'finished'
+ WHEN job.error_count > 0 THEN 'errored'
+ WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled'
+ ELSE 'ready'
+ END
+$$;
+
+
+--
+-- Name: que_job_notify(); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_job_notify() RETURNS trigger
+ LANGUAGE plpgsql
+ AS $$
+ DECLARE
+ locker_pid integer;
+ sort_key json;
+ BEGIN
+ -- Don't do anything if the job is scheduled for a future time.
+ IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
+ RETURN null;
+ END IF;
+
+ -- Pick a locker to notify of the job's insertion, weighted by their number
+ -- of workers. Should bounce pseudorandomly between lockers on each
+ -- invocation, hence the md5-ordering, but still touch each one equally,
+ -- hence the modulo using the job_id.
+ SELECT pid
+ INTO locker_pid
+ FROM (
+ SELECT *, last_value(row_number) OVER () + 1 AS count
+ FROM (
+ SELECT *, row_number() OVER () - 1 AS row_number
+ FROM (
+ SELECT *
+ FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
+ WHERE listening AND queues @> ARRAY[NEW.queue]
+ ORDER BY md5(pid::text || id::text)
+ ) t1
+ ) t2
+ ) t3
+ WHERE NEW.id % count = row_number;
+
+ IF locker_pid IS NOT NULL THEN
+ -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
+ -- rather than throw errors when someone enqueues a big job, just
+ -- broadcast the most pertinent information, and let the locker query for
+ -- the record after it's taken the lock. The worker will have to hit the
+ -- DB in order to make sure the job is still visible anyway.
+ SELECT row_to_json(t)
+ INTO sort_key
+ FROM (
+ SELECT
+ 'job_available' AS message_type,
+ NEW.queue AS queue,
+ NEW.priority AS priority,
+ NEW.id AS id,
+ -- Make sure we output timestamps as UTC ISO 8601
+ to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
+ ) t;
+
+ PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
+ END IF;
+
+ RETURN null;
+ END
+$$;
+
+
+--
+-- Name: que_state_notify(); Type: FUNCTION; Schema: public; Owner: -
+--
+
+CREATE FUNCTION public.que_state_notify() RETURNS trigger
+ LANGUAGE plpgsql
+ AS $$
+ DECLARE
+ row record;
+ message json;
+ previous_state text;
+ current_state text;
+ BEGIN
+ IF TG_OP = 'INSERT' THEN
+ previous_state := 'nonexistent';
+ current_state := public.que_determine_job_state(NEW);
+ row := NEW;
+ ELSIF TG_OP = 'DELETE' THEN
+ previous_state := public.que_determine_job_state(OLD);
+ current_state := 'nonexistent';
+ row := OLD;
+ ELSIF TG_OP = 'UPDATE' THEN
+ previous_state := public.que_determine_job_state(OLD);
+ current_state := public.que_determine_job_state(NEW);
+
+ -- If the state didn't change, short-circuit.
+ IF previous_state = current_state THEN
+ RETURN null;
+ END IF;
+
+ row := NEW;
+ ELSE
+ RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP;
+ END IF;
+
+ SELECT row_to_json(t)
+ INTO message
+ FROM (
+ SELECT
+ 'job_change' AS message_type,
+ row.id AS id,
+ row.queue AS queue,
+
+ coalesce(row.data->'tags', '[]'::jsonb) AS tags,
+
+ to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at,
+ to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time,
+
+ CASE row.job_class
+ WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN
+ coalesce(
+ row.args->0->>'job_class',
+ 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
+ )
+ ELSE
+ row.job_class
+ END AS job_class,
+
+ previous_state AS previous_state,
+ current_state AS current_state
+ ) t;
+
+ PERFORM pg_notify('que_state', message::text);
+
+ RETURN null;
+ END
+$$;
+
+
+--
-- Name: ar_internal_metadata; Type: TABLE; Schema: public; Owner: -
--
@@ -192,45 +394,51 @@ ALTER SEQUENCE public.mini_environments_id_seq OWNED BY public.mini_environments
--
--- Name: que_jobs; Type: TABLE; Schema: public; Owner: -
+-- Name: que_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
-CREATE TABLE public.que_jobs (
- priority smallint DEFAULT 100 NOT NULL,
- run_at timestamp with time zone DEFAULT now() NOT NULL,
- job_id bigint NOT NULL,
- job_class text NOT NULL,
- args json DEFAULT '[]'::json NOT NULL,
- error_count integer DEFAULT 0 NOT NULL,
- last_error text,
- queue text DEFAULT ''::text NOT NULL
-);
+CREATE SEQUENCE public.que_jobs_id_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
--
--- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: -
+-- Name: que_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
-COMMENT ON TABLE public.que_jobs IS '3';
+ALTER SEQUENCE public.que_jobs_id_seq OWNED BY public.que_jobs.id;
--
--- Name: que_jobs_job_id_seq; Type: SEQUENCE; Schema: public; Owner: -
+-- Name: que_lockers; Type: TABLE; Schema: public; Owner: -
--
-CREATE SEQUENCE public.que_jobs_job_id_seq
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
+CREATE UNLOGGED TABLE public.que_lockers (
+ pid integer NOT NULL,
+ worker_count integer NOT NULL,
+ worker_priorities integer[] NOT NULL,
+ ruby_pid integer NOT NULL,
+ ruby_hostname text NOT NULL,
+ queues text[] NOT NULL,
+ listening boolean NOT NULL,
+ CONSTRAINT valid_queues CHECK (((array_ndims(queues) = 1) AND (array_length(queues, 1) IS NOT NULL))),
+ CONSTRAINT valid_worker_priorities CHECK (((array_ndims(worker_priorities) = 1) AND (array_length(worker_priorities, 1) IS NOT NULL)))
+);
--
--- Name: que_jobs_job_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
+-- Name: que_values; Type: TABLE; Schema: public; Owner: -
--
-ALTER SEQUENCE public.que_jobs_job_id_seq OWNED BY public.que_jobs.job_id;
+CREATE TABLE public.que_values (
+ key text NOT NULL,
+ value jsonb DEFAULT '{}'::jsonb NOT NULL,
+ CONSTRAINT valid_value CHECK ((jsonb_typeof(value) = 'object'::text))
+)
+WITH (fillfactor='90');
--
@@ -409,10 +617,10 @@ ALTER TABLE ONLY public.mini_environments ALTER COLUMN id SET DEFAULT nextval('p
--
--- Name: que_jobs job_id; Type: DEFAULT; Schema: public; Owner: -
+-- Name: que_jobs id; Type: DEFAULT; Schema: public; Owner: -
--
-ALTER TABLE ONLY public.que_jobs ALTER COLUMN job_id SET DEFAULT nextval('public.que_jobs_job_id_seq'::regclass);
+ALTER TABLE ONLY public.que_jobs ALTER COLUMN id SET DEFAULT nextval('public.que_jobs_id_seq'::regclass);
--
@@ -496,7 +704,23 @@ ALTER TABLE ONLY public.mini_environments
--
ALTER TABLE ONLY public.que_jobs
- ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (queue, priority, run_at, job_id);
+ ADD CONSTRAINT que_jobs_pkey PRIMARY KEY (id);
+
+
+--
+-- Name: que_lockers que_lockers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.que_lockers
+ ADD CONSTRAINT que_lockers_pkey PRIMARY KEY (pid);
+
+
+--
+-- Name: que_values que_values_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.que_values
+ ADD CONSTRAINT que_values_pkey PRIMARY KEY (key);
--
@@ -589,6 +813,41 @@ CREATE INDEX index_mini_environments_on_govuk_guix_revision_id ON public.mini_en
--
+-- Name: que_jobs_args_gin_idx; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX que_jobs_args_gin_idx ON public.que_jobs USING gin (args jsonb_path_ops);
+
+
+--
+-- Name: que_jobs_data_gin_idx; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX que_jobs_data_gin_idx ON public.que_jobs USING gin (data jsonb_path_ops);
+
+
+--
+-- Name: que_poll_idx; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX que_poll_idx ON public.que_jobs USING btree (queue, priority, run_at, id) WHERE ((finished_at IS NULL) AND (expired_at IS NULL));
+
+
+--
+-- Name: que_jobs que_job_notify; Type: TRIGGER; Schema: public; Owner: -
+--
+
+CREATE TRIGGER que_job_notify AFTER INSERT ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_job_notify();
+
+
+--
+-- Name: que_jobs que_state_notify; Type: TRIGGER; Schema: public; Owner: -
+--
+
+CREATE TRIGGER que_state_notify AFTER INSERT OR DELETE OR UPDATE ON public.que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_state_notify();
+
+
+--
-- Name: mini_environments fk_rails_12ab275069; Type: FK CONSTRAINT; Schema: public; Owner: -
--
@@ -660,6 +919,7 @@ INSERT INTO "schema_migrations" (version) VALUES
('20180530192706'),
('20180601153537'),
('20180601182655'),
-('20180603120426');
+('20180603120426'),
+('20180621065525');
diff --git a/test/controllers/que_jobs_controller_test.rb b/test/controllers/que_jobs_controller_test.rb
index 9cab867..b181909 100644
--- a/test/controllers/que_jobs_controller_test.rb
+++ b/test/controllers/que_jobs_controller_test.rb
@@ -8,7 +8,7 @@ class QueJobsControllerTest < ActionDispatch::IntegrationTest
test 'cancel' do
job = GovukGuix::FetchRevisionJob.enqueue('test-revision')
- post cancel_que_job_path(job.attrs['job_id'])
+ post cancel_que_job_path(job.que_attrs[:id])
assert_response :redirect
end
@@ -16,7 +16,7 @@ class QueJobsControllerTest < ActionDispatch::IntegrationTest
test 'retry now' do
job = GovukGuix::FetchRevisionJob.enqueue('test-revision')
- post retry_now_que_job_path(job.attrs['job_id'])
+ post retry_now_que_job_path(job.que_attrs[:id])
assert_response :redirect
end