126 lines
4.9 KiB
Plaintext
126 lines
4.9 KiB
Plaintext
|
-- Test streaming of two-phase commits
|
||
|
SET synchronous_commit = on;
|
||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
|
||
|
?column?
|
||
|
----------
|
||
|
init
|
||
|
(1 row)
|
||
|
|
||
|
CREATE TABLE stream_test(data text);
|
||
|
-- consume DDL
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||
|
data
|
||
|
------
|
||
|
(0 rows)
|
||
|
|
||
|
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
|
||
|
BEGIN;
|
||
|
SAVEPOINT s1;
|
||
|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
|
||
|
?column?
|
||
|
----------
|
||
|
msg5
|
||
|
(1 row)
|
||
|
|
||
|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
|
||
|
TRUNCATE table stream_test;
|
||
|
ROLLBACK TO s1;
|
||
|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
|
||
|
PREPARE TRANSACTION 'test1';
|
||
|
-- should show the inserts after a ROLLBACK
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||
|
data
|
||
|
----------------------------------------------------------
|
||
|
streaming message: transactional: 1 prefix: test, sz: 50
|
||
|
opening a streamed block for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
streaming change for transaction
|
||
|
closing a streamed block for transaction
|
||
|
preparing streamed transaction 'test1'
|
||
|
(24 rows)
|
||
|
|
||
|
COMMIT PREPARED 'test1';
|
||
|
--should show the COMMIT PREPARED and the other changes in the transaction
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||
|
data
|
||
|
-------------------------
|
||
|
COMMIT PREPARED 'test1'
|
||
|
(1 row)
|
||
|
|
||
|
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
|
||
|
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
|
||
|
BEGIN;
|
||
|
SAVEPOINT s1;
|
||
|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
|
||
|
?column?
|
||
|
----------
|
||
|
msg5
|
||
|
(1 row)
|
||
|
|
||
|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
|
||
|
TRUNCATE table stream_test;
|
||
|
ROLLBACK to s1;
|
||
|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
|
||
|
PREPARE TRANSACTION 'test1_nodecode';
|
||
|
-- should NOT show inserts after a ROLLBACK
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||
|
data
|
||
|
----------------------------------------------------------
|
||
|
streaming message: transactional: 1 prefix: test, sz: 50
|
||
|
(1 row)
|
||
|
|
||
|
COMMIT PREPARED 'test1_nodecode';
|
||
|
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
|
||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||
|
data
|
||
|
-------------------------------------------------------------
|
||
|
BEGIN
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
|
||
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
|
||
|
COMMIT
|
||
|
(22 rows)
|
||
|
|
||
|
DROP TABLE stream_test;
|
||
|
SELECT pg_drop_replication_slot('regression_slot');
|
||
|
pg_drop_replication_slot
|
||
|
--------------------------
|
||
|
|
||
|
(1 row)
|
||
|
|