You need to enable JavaScript to run this app.
导航
逻辑日志转换(wal2json)
最近更新时间:2024.12.12 15:21:45首次发布时间:2024.12.12 15:21:45

wal2json 插件可以将逻辑日志文件输出为 JSON 格式供查看或者下游系统消费。

创建与删除插件

  • 创建插件

    无需执行 SQL 命令来创建插件,只需将实例的可修改参数 wal_level 设置为 logical 即可。关于修改参数的详细信息,请参见修改参数

  • 删除插件

    -- 如果需要停止输出并释放资源,请执行如下命令删除复制slot:
    SELECT 'drop' FROM pg_drop_replication_slot('replication_slot');
     ?column? 
    ----------
     drop
    (1 row)
    
    postgres=# select * from pg_replication_slots ;
     slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size 
    -----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
    (0 rows)
    

使用插件

--创建测试表
CREATE TABLE test_table_pk (id SERIAL, name VARCHAR(30), time_info TIMESTAMP NOT NULL, PRIMARY KEY(id));
CREATE TABLE test_table_2_pk_no (id SERIAL, num NUMERIC(5,2), info TEXT);
--创建复制slot,slot名字可以自己修改(注意不同DB需要创建不同的slot,比如:replication_slot_xxx)
SELECT 'creat' FROM pg_create_logical_replication_slot('replication_slot', 'wal2json');
 ?column? 
----------
 creat
(1 row)

--查看创建的复制slot
select * from pg_replication_slots ;
    slot_name     |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_
wal_size 
------------------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+------
---------
 replication_slot | wal2json | logical   |  14037 | postgres | f         | f      |            |      |          495 | 0/17F5E28   | 0/17F5F80           | reserved   |      
        
(1 row)

--进行数据的增删改
BEGIN;
INSERT INTO test_table_pk (name, time_info) VALUES('liming', now());
INSERT INTO test_table_pk (name, time_info) VALUES('zhanglei', now());
INSERT INTO test_table_pk (name, time_info) VALUES('jenny', now());
DELETE FROM test_table_pk WHERE id < 3;
INSERT INTO test_table_2_pk_no (num, info) VALUES(3.14, 'pie');
UPDATE test_table_2_pk_no SET info = 'pizza' WHERE info = 'pie';
COMMIT;

--查看数据json格式的日志
SELECT data FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL, 'pretty-print', '1');
WARNING:  table "test_table_2_pk_no" without primary key or replica identity is nothing
                                                    data                                                     
-------------------------------------------------------------------------------------------------------------
 {                                                                                                          +
         "change": [                                                                                        +
                 {                                                                                          +
                         "kind": "insert",                                                                  +
                         "schema": "public",                                                                +
                         "table": "test_table_pk",                                                          +
                         "columnnames": ["id", "name", "time_info"],                                        +
                         "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
                         "columnvalues": [1, "liming", "2024-08-16 16:39:24.320516"]                        +
                 }                                                                                          +
                 ,{                                                                                         +
                         "kind": "insert",                                                                  +
                         "schema": "public",                                                                +
                         "table": "test_table_pk",                                                          +
                         "columnnames": ["id", "name", "time_info"],                                        +
                         "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
                         "columnvalues": [2, "zhanglei", "2024-08-16 16:39:24.320516"]                      +
                 }                                                                                          +
                 ,{                                                                                         +
                         "kind": "insert",                                                                  +
                         "schema": "public",                                                                +
                         "table": "test_table_pk",                                                          +
                         "columnnames": ["id", "name", "time_info"],                                        +
                         "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
                         "columnvalues": [3, "jenny", "2024-08-16 16:39:24.320516"]                         +
                 }                                                                                          +
                 ,{                                                                                         +
                         "kind": "delete",                                                                  +
                         "schema": "public",                                                                +
                         "table": "test_table_pk",                                                          +
                         "oldkeys": {                                                                       +
                                 "keynames": ["id"],                                                        +
                                 "keytypes": ["integer"],                                                   +
                                 "keyvalues": [1]                                                           +
                         }                                                                                  +
                 }                                                                                          +
                 ,{                                                                                         +
                         "kind": "delete",                                                                  +
                         "schema": "public",                                                                +
                         "table": "test_table_pk",                                                          +
                         "oldkeys": {                                                                       +
                                 "keynames": ["id"],                                                        +
                                 "keytypes": ["integer"],                                                   +
                                 "keyvalues": [2]                                                           +
                         }                                                                                  +
                 }                                                                                          +
                 ,{                                                                                         +
                         "kind": "insert",                                                                  +
                         "schema": "public",                                                                +
                         "table": "test_table_2_pk_no",                                                     +
                         "columnnames": ["id", "num", "info"],                                              +
                         "columntypes": ["integer", "numeric(5,2)", "text"],                                +
                         "columnvalues": [1, 3.14, "pie"]                                                   +
                 }                                                                                          +
         ]                                                                                                  +
 }
(1 row)

注:因为test_table_2_pk_no没有主键,所以update数据无法展示

其他

更多信息,请参见 wal2json 官方文档