|
2 使用
下面用两个示例简单说明更改跟踪和变更数据捕获的配置及变更信息的查询。
2.1 更改跟踪
更改跟踪的配置如下:
a. 在数据库上启用更改跟踪(ALTER DATABASE … CHANGE_TRACKING = ON),并设置跟踪结果保持期;
b. 在需要跟踪更改的每个表上启用更改跟踪(ALTER TABLE … ENABLE CHANGE_TRACKING),并设置是否要求记录UPDATE的列信息。(启用更改跟踪的表需要有主键)。
更改跟踪结果的查询包括:
a. CHANGE_TRACKING_CURRENT_VERSION
返回与上次提交的事务相关联的版本号。启用了更改跟踪的数据库具有一个版本计数器,在对启用了更改跟踪的表进行更改时,该计数器会随之递增。每个更改的行都有一个关联的版本号。可以在每次查询完成后,记录这个版本号,下次查询时,基于这个版本号查询,以获取后续的最新更改。
b. CHANGE_TRACKING_MIN_VALID_VERSION
指定表可用的最低有效版本号。在第一次查询数据的时候,可以使用此函数得到查询更改信息的起始版本号;
c. CHANGETABLE(CHANGES)
返回自指定版本起对表所做的所有更改的跟踪信息;
d. CHANGETABLE(VERSION)
返回指定行的最新更改跟踪信息。(通过指定特定行对应的主键列值);
e. CHANGE_TRACKING_IS_COLUMN_IN_MASK
通过CHANGETABLE(CHANGES …)函数返回的SYS_CHANGE_COLUMNS值及列id,确定该列是否被UPDATE。
下面的T-SQL示例创建一个测试数据库,并在测试数据库中演示配置更改跟踪及查询更改跟踪信息。
USE master;
GO
CREATE DATABASE DB_test;
GO
ALTER DATABASE DB_test SET
CHANGE_TRACKING = ON(
AUTO_CLEANUP = ON,
CHANGE_RETENTION = 1 HOURS
);
GO
USE DB_test;
GO
CREATE TABLE dbo.tb(
id int
CONSTRAINT PK_tb_id PRIMARY KEY,
col1 int,
col2 varchar(10),
col3 nvarchar(max),
col4 varbinary(max),
col5 xml
);
GO
ALTER TABLE dbo.tb
ENABLE CHANGE_TRACKING
WITH(
TRACK_COLUMNS_UPDATED = ON
);
GO
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb'));
GO
INSERT dbo.tb(
id,
col1, col2, col3, col4, col5)
VALUES(
1,
1, 'AA', 'AAA', 0x1, '<a>aa</a>'),
(
2,
2, 'BB', 'BBB', 0x2, '<b/>'),
(
3,
3, 'CC', 'CCC', 0x2, '<c/>');
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb')),
*
FROM CHANGETABLE(CHANGES dbo.tb, 0) CHG
LEFT JOIN dbo.tb DATA
ON DATA.id = CHG.id;
BEGIN TRAN;
UPDATE dbo.tb SET
col1 = 11
WHERE>
UPDATE dbo.tb SET
col1 = 111
WHERE>
COMMIT TRAN;
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb')),
*
FROM CHANGETABLE(CHANGES dbo.tb, 0) CHG
LEFT JOIN dbo.tb DATA
ON DATA.id = CHG.id;
UPDATE dbo.tb SET
col5.modify('replace value of /a[1]/text()[1] with "replace"')
WHERE>
UPDATE dbo.tb SET
col5.modify('insert <a>1</a> as last into /')
WHERE>
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb')),
*
FROM CHANGETABLE(CHANGES dbo.tb, 0) CHG
LEFT JOIN dbo.tb DATA
ON DATA.id = CHG.id;
UPDATE dbo.tb SET
col4 = col4 + 0x12345
WHERE>
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb')),
*
FROM CHANGETABLE(CHANGES dbo.tb, 0) CHG
LEFT JOIN dbo.tb DATA
ON DATA.id = CHG.id;
UPDATE dbo.tb SET
id = 11
WHERE>
INSERT dbo.tb(
id,
col1, col2, col3, col4, col5)
VALUES(
1,
1, 'AA', 'AAA', 0x1, '<a>aa</a>')
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb')),
*
FROM CHANGETABLE(CHANGES dbo.tb, 0) CHG
LEFT JOIN dbo.tb DATA
ON DATA.id = CHG.id;
SELECT
CHANGE_TRACKING_CURRENT_VERSION(),
CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'dbo.tb')),
*
FROM dbo.tb DATA
OUTER APPLY CHANGETABLE(VERSION dbo.tb, (id), (DATA.id)) CHG
2.2 变更数据捕获
变更数据捕获配置如下:
a. 在数据库上启用变更数据捕获(调用系统存储过程sys.sp_cdc_enable_db);
b. 通过系统存储过程sys.sp_cdc_add_job创建捕获和清理Job(可选,如果没有捕获和清理Job,会在创建数据库中的第一个变更数据捕获时自动建立,自动建立的Job可以通过调用系统存储过程sys.sp_cdc_change_job来调整捕获和清理相关的一些选项);
c. 在需要捕获变更数据的每个表上建立变更数据捕获实例(每个表上可以建立<=2个捕获实例,创建捕获实例使用系统存储过程sys.sp_cdc_enable_table)。
捕获的变更数据的查询包括:
a. sys.fn_cdc_get_min_lsn
返回指定捕获实例的有效性间隔的低端点(start_lsn);
b. sys.fn_cdc_get_max_lsn
返回cdc.lsn_time_mapping系统表的最大日志序列号(LSN);
c. cdc.fn_cdc_get_all_changes_<捕获实例>
针对在指定日志序列号(LSN)范围内应用到源表的每项更改均返回一行。如果源行在该间隔内有多项更改,则每项更改都会表示在返回的结果集中。除了返回更改数据外,四个元数据列还提供了将更改应用到另一个数据源所需的信息。行筛选选项可控制元数据列的内容以及结果集中返回的行。当指定“all”行筛选选项时,针对每项更改将只有一行来标识该更改。当指定“all update old”选项时,更新操作会表示为两行:一行包含更新之前已捕获列的值,另一行包含更新之后已捕获列的值。
此枚举函数是在对源表启用变更数据捕获时创建的。此函数名称是派生的,采用cdc.fn_cdc_get_all_changes_capture_instance格式,其中capture_instance是在对源表启用变更数据捕获时为捕获实例指定的值;
d. cdc.fn_cdc_get_net_changes_<capture_instance>
针对指定日志序列号(LSN)范围内每个已更改的源行返回一个净更改行。净更改行指:如果在LSN范围内源行具有多项更改,则该函数将返回反映该行最终内容的单一行。例如,如果事务在源表中插入一行,并且LSN范围内的后续事务更新了该行中的一个或多个列,则该函数将只返回一行,其中包含多个更新的列值。
此枚举函数是在对某源表启用变更数据捕获并指定净跟踪时创建的。函数名称是派生的,采用cdc.fn_cdc_get_net_changes_capture_instance格式,其中capture_instance是对变更数据捕获启用源表时为捕获实例指定的值;
e. sys.fn_cdc_map_time_to_lsn
为指定的时间返回cdc.lsn_time_mapping系统表中start_lsn列中的日志序列号(LSN)值;
f. sys.fn_cdc_has_column_changed
标识指定的更新掩码是否指示已更新关联的更改行中的指定列。
下面的T-SQL示例创建一个测试数据库,并在测试数据库中演示配置变更数据捕获及查询捕获结果。
USE master;
GO
CREATE DATABASE DB_test;
GO
USE DB_test;
EXEC sys.sp_cdc_enable_db;
GO
DECLARE
@agnt_service sysname;
SET @agnt_service = N'SQLServerAgent';
DECLARE @tb_agent_status TABLE(
state varchar(50)
);
INSERT @tb_agent_status
EXEC master.sys.xp_servicecontrol
N'QUERYSTATE',
@agnt_service;
IF NOT EXISTS(
SELECT * FROM @tb_agent_status
WHERE state = N'Running.')
EXEC master.sys.xp_servicecontrol
N'START',
@agnt_service;
GO
USE DB_test;
GO
CREATE TABLE dbo.tb(
id int
CONSTRAINT PK_tb_id PRIMARY KEY,
col1 int,
col2 varchar(10),
col3 nvarchar(max),
col4 varbinary(max),
col5 xml
);
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'tb',
@capture_instance = N'dbo_tb',
@role_name = NULL;
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'tb',
@capture_instance = N'dbo_tb_col',
@role_name = NULL,
@captured_column_list = N'id,col1,col2';
GO
INSERT dbo.tb(
id,
col1, col2, col3, col4, col5)
VALUES(
1,
1, 'AA', 'AAA', 0x1, '<a>aa</a>'),
(
2,
2, 'BB', 'BBB', 0x2, '<b/>'),
(
3,
3, 'CC', 'CCC', 0x2, '<c/>');
WITH
LSN AS(
SELECT
from_lsn = sys.fn_cdc_get_min_lsn(N'dbo_tb'),
to_lsn = sys.fn_cdc_get_max_lsn()
),
CHG_ALL AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_all_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL UPDATE OLD') CHG
),
CHG_NET AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_net_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL') CHG
)
SELECT * FROM CHG_ALL;
BEGIN TRAN;
UPDATE dbo.tb SET
col1 = 11
WHERE>
UPDATE dbo.tb SET
col1 = 111
WHERE>
COMMIT TRAN;
WITH
LSN AS(
SELECT
from_lsn = sys.fn_cdc_get_min_lsn(N'dbo_tb'),
to_lsn = sys.fn_cdc_get_max_lsn()
),
CHG_ALL AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_all_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL UPDATE OLD') CHG
),
CHG_NET AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_net_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL') CHG
)
SELECT * FROM CHG_ALL;
UPDATE dbo.tb SET
col5.modify('replace value of /a[1]/text()[1] with "replace"')
WHERE>
UPDATE dbo.tb SET
col5.modify('insert <a>1</a> as last into /')
WHERE>
UPDATE dbo.tb SET
col4 = col4 + 0x12345
WHERE>
WITH
LSN AS(
SELECT
from_lsn = sys.fn_cdc_get_min_lsn(N'dbo_tb'),
to_lsn = sys.fn_cdc_get_max_lsn()
),
CHG_ALL AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_all_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL UPDATE OLD') CHG
),
CHG_NET AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_net_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL') CHG
)
SELECT * FROM CHG_ALL;
UPDATE dbo.tb SET
id = 11
WHERE>
INSERT dbo.tb(
id,
col1, col2, col3, col4, col5)
VALUES(
1,
1, 'AA', 'AAA', 0x1, '<a>aa</a>');
WITH
LSN AS(
SELECT
from_lsn = sys.fn_cdc_get_min_lsn(N'dbo_tb'),
to_lsn = sys.fn_cdc_get_max_lsn()
),
CHG_ALL AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_all_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL UPDATE OLD') CHG
),
CHG_NET AS(
SELECT
CHG.*
FROM LSN
CROSS APPLY cdc.fn_cdc_get_net_changes_dbo_tb(LSN.from_lsn, LSN.to_lsn, 'ALL') CHG
)
SELECT * FROM CHG_ALL;
|
|