409 lines
13 KiB
Plaintext
409 lines
13 KiB
Plaintext
|
-- predictability
|
||
|
SET synchronous_commit = on;
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding');
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('regression_slot_p');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
|
||
|
ERROR: could not access file "nonexistent": No such file or directory
|
||
|
-- here we want to start a new session and wait till old one is gone
|
||
|
select pg_backend_pid() as oldpid \gset
|
||
|
\c -
|
||
|
SET synchronous_commit = on;
|
||
|
do 'declare c int = 0;
|
||
|
begin
|
||
|
while (select count(*) from pg_replication_slots where active_pid = '
|
||
|
:'oldpid'
|
||
|
') > 0 loop c := c + 1; perform pg_sleep(0.01); end loop;
|
||
|
raise log ''slot test looped % times'', c;
|
||
|
end';
|
||
|
-- should fail because the temporary slots were dropped automatically
|
||
|
SELECT pg_drop_replication_slot('regression_slot_t');
|
||
|
ERROR: replication slot "regression_slot_t" does not exist
|
||
|
SELECT pg_drop_replication_slot('regression_slot_t2');
|
||
|
ERROR: replication slot "regression_slot_t2" does not exist
|
||
|
-- monitoring functions for slot directories
|
||
|
SELECT count(*) >= 0 AS ok FROM pg_ls_logicalmapdir();
|
||
|
ok
|
||
|
----
|
||
|
t
|
||
|
(1 row)
|
||
|
|
||
|
SELECT count(*) >= 0 AS ok FROM pg_ls_logicalsnapdir();
|
||
|
ok
|
||
|
----
|
||
|
t
|
||
|
(1 row)
|
||
|
|
||
|
SELECT count(*) >= 0 AS ok FROM pg_ls_replslotdir('regression_slot_p');
|
||
|
ok
|
||
|
----
|
||
|
t
|
||
|
(1 row)
|
||
|
|
||
|
SELECT count(*) >= 0 AS ok FROM pg_ls_replslotdir('not_existing_slot'); -- fails
|
||
|
ERROR: replication slot "not_existing_slot" does not exist
|
||
|
-- permanent slot has survived
|
||
|
SELECT pg_drop_replication_slot('regression_slot_p');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
-- test switching between slots in a session
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
|
||
|
BEGIN;
|
||
|
INSERT INTO replication_example(somedata, text) VALUES (1, 1);
|
||
|
INSERT INTO replication_example(somedata, text) VALUES (1, 2);
|
||
|
COMMIT;
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
INSERT INTO replication_example(somedata, text) VALUES (1, 3);
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||
|
data
|
||
|
---------------------------------------------------------------------------------------------------------
|
||
|
BEGIN
|
||
|
table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:'1'
|
||
|
table public.replication_example: INSERT: id[integer]:2 somedata[integer]:1 text[character varying]:'2'
|
||
|
COMMIT
|
||
|
BEGIN
|
||
|
table public.replication_example: INSERT: id[integer]:3 somedata[integer]:1 text[character varying]:'3'
|
||
|
COMMIT
|
||
|
(7 rows)
|
||
|
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||
|
data
|
||
|
---------------------------------------------------------------------------------------------------------
|
||
|
BEGIN
|
||
|
table public.replication_example: INSERT: id[integer]:3 somedata[integer]:1 text[character varying]:'3'
|
||
|
COMMIT
|
||
|
(3 rows)
|
||
|
|
||
|
INSERT INTO replication_example(somedata, text) VALUES (1, 4);
|
||
|
INSERT INTO replication_example(somedata, text) VALUES (1, 5);
|
||
|
SELECT pg_current_wal_lsn() AS wal_lsn \gset
|
||
|
INSERT INTO replication_example(somedata, text) VALUES (1, 6);
|
||
|
SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
|
||
|
SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
|
||
|
slot_name
|
||
|
------------------
|
||
|
regression_slot2
|
||
|
(1 row)
|
||
|
|
||
|
SELECT :'wal_lsn' = :'end_lsn';
|
||
|
?column?
|
||
|
----------
|
||
|
t
|
||
|
(1 row)
|
||
|
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||
|
data
|
||
|
---------------------------------------------------------------------------------------------------------
|
||
|
BEGIN
|
||
|
table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6'
|
||
|
COMMIT
|
||
|
(3 rows)
|
||
|
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||
|
data
|
||
|
------
|
||
|
(0 rows)
|
||
|
|
||
|
DROP TABLE replication_example;
|
||
|
-- error
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
|
||
|
ERROR: replication slot "regression_slot1" already exists
|
||
|
-- both should error as they should be dropped on error
|
||
|
SELECT pg_drop_replication_slot('regression_slot1');
|
||
|
ERROR: replication slot "regression_slot1" does not exist
|
||
|
SELECT pg_drop_replication_slot('regression_slot2');
|
||
|
ERROR: replication slot "regression_slot2" does not exist
|
||
|
-- slot advance with physical slot, error with non-reserved slot
|
||
|
SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3');
|
||
|
slot_name
|
||
|
------------------
|
||
|
regression_slot3
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN
|
||
|
ERROR: invalid target WAL LSN
|
||
|
SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error
|
||
|
ERROR: replication slot "regression_slot3" cannot be advanced
|
||
|
DETAIL: This slot has never previously reserved WAL, or it has been invalidated.
|
||
|
SELECT pg_drop_replication_slot('regression_slot3');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
--
|
||
|
-- Test copy functions for logical replication slots
|
||
|
--
|
||
|
-- Create and copy logical slots
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
-- Check all copied slots status
|
||
|
SELECT
|
||
|
o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
|
||
|
FROM
|
||
|
(SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
|
||
|
LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
|
||
|
WHERE
|
||
|
o.slot_name != c.slot_name
|
||
|
ORDER BY o.slot_name, c.slot_name;
|
||
|
slot_name | plugin | temporary | slot_name | plugin | temporary
|
||
|
------------+---------------+-----------+---------------------------------+---------------+-----------
|
||
|
orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f
|
||
|
orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t
|
||
|
orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f
|
||
|
(3 rows)
|
||
|
|
||
|
-- Now we have maximum 4 replication slots. Check slots are properly
|
||
|
-- released even when raise error during creating the target slot.
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
|
||
|
ERROR: all replication slots are in use
|
||
|
HINT: Free one or increase max_replication_slots.
|
||
|
-- temporary slots were dropped automatically
|
||
|
SELECT pg_drop_replication_slot('orig_slot1');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('copied_slot1_no_change');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
-- Test based on the temporary logical slot
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
-- Check all copied slots status
|
||
|
SELECT
|
||
|
o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
|
||
|
FROM
|
||
|
(SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
|
||
|
LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
|
||
|
WHERE
|
||
|
o.slot_name != c.slot_name
|
||
|
ORDER BY o.slot_name, c.slot_name;
|
||
|
slot_name | plugin | temporary | slot_name | plugin | temporary
|
||
|
------------+---------------+-----------+---------------------------------+---------------+-----------
|
||
|
orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t
|
||
|
orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f
|
||
|
orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t
|
||
|
(3 rows)
|
||
|
|
||
|
-- Cannot copy a logical slot to a physical slot
|
||
|
SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
|
||
|
ERROR: cannot copy physical replication slot "orig_slot2" as a logical replication slot
|
||
|
-- temporary slots were dropped automatically
|
||
|
SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
--
|
||
|
-- Test copy functions for physical replication slots
|
||
|
--
|
||
|
-- Create and copy physical slots
|
||
|
SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
|
||
|
SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
|
||
|
slot_name | slot_type | temporary
|
||
|
------------------------+-----------+-----------
|
||
|
orig_slot1 | physical | f
|
||
|
orig_slot2 | physical | f
|
||
|
copied_slot1_no_change | physical | f
|
||
|
copied_slot1_temp | physical | t
|
||
|
(4 rows)
|
||
|
|
||
|
-- Cannot copy a physical slot to a logical slot
|
||
|
SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
|
||
|
ERROR: cannot copy logical replication slot "orig_slot1" as a physical replication slot
|
||
|
-- Cannot copy a physical slot that doesn't reserve WAL
|
||
|
SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
|
||
|
ERROR: cannot copy a replication slot that doesn't reserve WAL
|
||
|
-- temporary slots were dropped automatically
|
||
|
SELECT pg_drop_replication_slot('orig_slot1');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('orig_slot2');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('copied_slot1_no_change');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
-- Test based on the temporary physical slot
|
||
|
SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
|
||
|
?column?
|
||
|
----------
|
||
|
copy
|
||
|
(1 row)
|
||
|
|
||
|
-- Check all copied slots status
|
||
|
SELECT
|
||
|
o.slot_name, o.temporary, c.slot_name, c.temporary
|
||
|
FROM
|
||
|
(SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
|
||
|
LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
|
||
|
WHERE
|
||
|
o.slot_name != c.slot_name
|
||
|
ORDER BY o.slot_name, c.slot_name;
|
||
|
slot_name | temporary | slot_name | temporary
|
||
|
------------+-----------+------------------------+-----------
|
||
|
orig_slot2 | t | copied_slot2_no_change | t
|
||
|
orig_slot2 | t | copied_slot2_notemp | f
|
||
|
(2 rows)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('orig_slot2');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('copied_slot2_no_change');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|
||
|
SELECT pg_drop_replication_slot('copied_slot2_notemp');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|