wal2json 插件可以将逻辑日志文件输出为 JSON 格式供查看或者下游系统消费。
创建插件
无需执行 SQL 命令来创建插件,只需将实例的可修改参数 wal_level 设置为 logical
即可。关于修改参数的详细信息,请参见修改参数。
删除插件
-- 如果需要停止输出并释放资源,请执行如下命令删除复制slot:
SELECT 'drop' FROM pg_drop_replication_slot('replication_slot');
?column?
----------
drop
(1 row)
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
(0 rows)
--创建测试表
CREATE TABLE test_table_pk (id SERIAL, name VARCHAR(30), time_info TIMESTAMP NOT NULL, PRIMARY KEY(id));
CREATE TABLE test_table_2_pk_no (id SERIAL, num NUMERIC(5,2), info TEXT);
--创建复制slot,slot名字可以自己修改(注意不同DB需要创建不同的slot,比如:replication_slot_xxx)
SELECT 'creat' FROM pg_create_logical_replication_slot('replication_slot', 'wal2json');
?column?
----------
creat
(1 row)
--查看创建的复制slot
select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_
wal_size
------------------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+------
---------
replication_slot | wal2json | logical | 14037 | postgres | f | f | | | 495 | 0/17F5E28 | 0/17F5F80 | reserved |
(1 row)
--进行数据的增删改
BEGIN;
INSERT INTO test_table_pk (name, time_info) VALUES('myuser1', now());
INSERT INTO test_table_pk (name, time_info) VALUES('myuser2', now());
INSERT INTO test_table_pk (name, time_info) VALUES('myuser3', now());
DELETE FROM test_table_pk WHERE id < 3;
INSERT INTO test_table_2_pk_no (num, info) VALUES(3.14, 'pie');
UPDATE test_table_2_pk_no SET info = 'pizza' WHERE info = 'pie';
COMMIT;
--查看数据json格式的日志
SELECT data FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL, 'pretty-print', '1');
WARNING: table "test_table_2_pk_no" without primary key or replica identity is nothing
data
-------------------------------------------------------------------------------------------------------------
{ +
"change": [ +
{ +
"kind": "insert", +
"schema": "public", +
"table": "test_table_pk", +
"columnnames": ["id", "name", "time_info"], +
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
"columnvalues": [1, "myuser1", "2024-08-16 16:39:24.320516"] +
} +
,{ +
"kind": "insert", +
"schema": "public", +
"table": "test_table_pk", +
"columnnames": ["id", "name", "time_info"], +
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
"columnvalues": [2, "myuser2", "2024-08-16 16:39:24.320516"] +
} +
,{ +
"kind": "insert", +
"schema": "public", +
"table": "test_table_pk", +
"columnnames": ["id", "name", "time_info"], +
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
"columnvalues": [3, "myuser3", "2024-08-16 16:39:24.320516"] +
} +
,{ +
"kind": "delete", +
"schema": "public", +
"table": "test_table_pk", +
"oldkeys": { +
"keynames": ["id"], +
"keytypes": ["integer"], +
"keyvalues": [1] +
} +
} +
,{ +
"kind": "delete", +
"schema": "public", +
"table": "test_table_pk", +
"oldkeys": { +
"keynames": ["id"], +
"keytypes": ["integer"], +
"keyvalues": [2] +
} +
} +
,{ +
"kind": "insert", +
"schema": "public", +
"table": "test_table_2_pk_no", +
"columnnames": ["id", "num", "info"], +
"columntypes": ["integer", "numeric(5,2)", "text"], +
"columnvalues": [1, 3.14, "pie"] +
} +
] +
}
(1 row)
注:因为test_table_2_pk_no没有主键,所以update数据无法展示
更多信息,请参见 wal2json 官方文档。