介绍

SymmetricDS是一个数据同步工具,当前版本是基于数据库触发器捕捉数据变化的。

官方教程里模拟的场景是主从节点数据同步。使用的数据是来自于SymmetricDS自带的一些SQL脚本。看完后对有些概念还是比较模糊,因此我又重新模拟了一个简单的场景。这个场景里我只对单表DEMO进行双向数据同步。

配置步骤

这个示例的目标是在corp和store1两个库间,实现DEMO表的双向同步。为方便测试,使用本机docker上运行的mariadb数据库。创建corp和store1两个库和数据库用户。

安装

我使用的是从官网下载的SymmetricDS 3.9.16。下载完成后,解压即可。

准备数据库

docker compose配置信息如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
mysql:
    image: mariadb
    ports:
     - "3306:3306"
    environment:
     - MYSQL_ROOT_PASSWORD=mysql
     - MYSQL_DATABASE=symtest
     - MYSQL_USER=symtest
     - MYSQL_PASSWORD=symtest
    volumes:
     - ./mysql:/var/lib/mysql

这里的symtest库与测试过程无关。数据库运行后,通过客户端连接该数据库,分别创建corp和store1用户和数据库。

在 corp和store1上建立DEMO表(也可通过配置改为自动在store1上创建表)。

1
2
3
4
5
6
7
create table DEMO
(
	A int auto_increment
		primary key,
	B text null,
	C date null
);

配置主从节点

从SymmetricDS安装目录的samples目录下复制corp-000.properties和store1-001.properties至engines目录。

因为我们要配置自己的使用场景,因此将两个文件修改为:

主节点配置corp-000.properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
engine.name=corp-000
db.driver=com.mysql.jdbc.Driver
db.url=jdbc:mysql://localhost/corp?tinyInt1isBit=false
db.user=corp
db.password=corp
registration.url=
sync.url=http://localhost:9000/sync/corp-000
group.id=corp
external.id=000

# Don't muddy the waters with purge logging
job.purge.period.time.ms=7200000

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000
# Kick off initial load
initial.load.create.first=true

主节点运行在9000端口,以corp用户连接corp库。

从节点配置 store-001.properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
engine.name=store-001
db.driver=com.mysql.jdbc.Driver
db.url=jdbc:mysql://localhost/store1?tinyInt1isBit=false
db.user=store1
db.password=store1
registration.url=http://localhost:9000/sync/corp-000
sync.url=http://localhost:9002/sync/store-001

group.id=store
external.id=001

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

从节点运行在9002端口,以store1用户连接store1库。

编写完配置文件后,创建主节点所需要的表:

1
./symadmin --engine corp-000 create-sym-tables

执行完毕后,在corp库中可以看到创建了很多以sym_为前缀的表。

注册从节点:

1
./symadmin --engine corp-000 open-registration store 001

注册完毕后,在corp库的sym_node_group表中能看到注册进来的两个node_group

启动主节点:

1
./sym --engine corp-000 --port 9000

启动从节点:

1
./sym --engine store-001 --port 9002

启动完主从节点后,查看sym_node表,可看到主从节点的信息。从节点启动时会自动注册至主节点,并且会在store1库中自动创建sym_前缀的表。

配置数据同步规则

节点启动正常之后,可以在corp库中添加数据同步规则配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-- 使用corp 库
use corp;

-- 添加节点连接,P表示store从corp中主动拉取数据,W表示corp接受从store中推送的数据
insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('store', 'corp', 'P');
insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('corp', 'store', 'W');

-- 创建数据通道,同一通道上的数据事件会在同一事务中执行
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('demo', 1, 100000, 1, 'demo sync channel');

-- 创建触发器,在demo表上创建触发器(这个触发器与库无关,双向同步也只需要创建一个)
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('demo_trigger','demo','demo',current_timestamp,current_timestamp);

-- 配置路由规则(每个方向一条)
insert into sym_router
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time)
values('store_to_corp', 'store', 'corp', 'default',current_timestamp, current_timestamp);
insert into sym_router
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time)
values('corp_2_store', 'corp', 'store', 'default',current_timestamp, current_timestamp);

-- 配置触发器路由规则(每个方向一条)
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('demo_trigger','store_to_corp', 200, current_timestamp, current_timestamp);

insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('demo_trigger','corp_2_store', 200, current_timestamp, current_timestamp);

配置完毕后,可测试在DEMO中做数据操作了。在corp和store1中进行的数据操作都会被同步至另一端。

主要表

配置表结构

data model config

运行时表结构

data model runtime

技巧

  • channel表上的参数

MAX_BATCH_SIZE可控制通道上单个batch最大的data event的最大数量。

MAX_BATCH_TO_SEND可控制一次同步操作(一次pull或push)最多处理的batch数量。

MAX_DATA_TO_ROUTE一个channel上进行一次数据路由最大处理的行数量。

  • 初始数据加载

官方文档的 Initial Loads章节Load Data都对数据加载有描述。Initial Loads通过配置文件进行初始化加载的控制,可实现正/反向的数据初始加载,及初始加载batch大小的控制。Load Data可以通过向SYM_TABLE_RELOAD_REQUEST中插入数据来控制数据的批量加载。

  • 问题分析和处理

从运行时表结构结合Outgoing BatchesIncoming Batches相关内容可用于分析处理同步错误。

sym_incoming_batchsym_incoming_error表可以分析写入端的出错情况。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
select count(*), node_id from sym_incoming_batch
 where error_flag=1 group by node_id;
 
select node_id, host_name from sym_node_host
 where heartbeat_time < current_timestamp-1;
 
select * from sym_incoming_batch where error_flag=1;

select * from sym_incoming_error
 where batch_id='XXXXXX' and node_id='YYYYY';
-- where XXXXXX is the batch id and YYYYY is the node id of the failing batch.

sym_outgoing_batchsym_datasym_data_event可以分析读取端的出错情况。通过修改sym_outgoing_batch的状态字段,可以跳过部分批次数据的同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
select count(*), node_id from sym_outgoing_batch
 where error_flag=1 group by node_id;

select sum(data_event_count), node_id from sym_outgoing_batch
 where status != 'OK' group by node_id;

select sum(end_id-start_id) from sym_data_gap
 where start_id < (select max(start_id) from sym_data_gap);

select count(*) from sym_data
 where data_id >= (select max(start_id) from sym_data_gap);
 
select * from sym_outgoing_batch where error_flag=1;

select * from sym_data where data_id in
  (select data_id from sym_data_event where batch_id='XXXXXX');
  
update sym_outgoing_batch set status='OK' where batch_id='XXXXXX'

delete from sym_data_event where batch_id='XXXXXX' and data_id='YYYYYY'
 -- where XXXXXX is the failing batch and YYYYYY is the data id to longer be included in the batch.

集群部署

SymmetricDS支持集群模式部署。以集群模式部署的节点具备负载均衡和高可用功能。可以使用硬件负载均衡器或者反向代理软件。

SymmetricDS 3.8及之后的版本,推荐将负载均衡器配置为会话粘连模式(sticky session),并且确保集群的所有节点共享stage目录。Sticky session用于支持预定请求,它可以让节点在在连接和推送数据前进行预定。共享stage目录用于在intitial load时支持后台取出数据,数据的取出操作在一个节点上进行,供集群的不同节点使用。如果start.initial.load.extract.job属性被设置为禁用,则可以不使用共享stage,但是initial load的性能会受影响。

SymmetricDS 3.7及之前的版本,推荐将负载均衡器配置为无状态模式,采用round robin进行负载均衡。

sync.url属性应该配置为负载均衡器的地址。

只要集群运行任何类型的SymmetricDS job,cluster.lock.enabled应该设置为true。设置为true之后,SymmetricDS将使用LOCK表中的一行记录进行信号同步,确保每次只有一个job在运行。在获取锁时,LOCK表上的一行记录被更新为执行锁定的server id和锁定的时间。当job执行完毕后锁定时间被设置回null。在锁被释放前(这个server id相关的锁)另一个实例无法获取锁。如果锁定记录的实例当掉,锁会在一小段时间后被释放,这个时间可通过cluster.lock.timeout.ms属性进行配置。如果在job仍然运行的时候锁过期,可能出现两个job在同一时间运行的情况,并可能导致数据库死锁。

默认情况下,锁的server id是服务所在的主机名。如果集群的两个实例运行在同一台服务器上,应该配置cluster.server.id属性,以区分它是server id中的哪个实例。(我理解为,同一个cluster的不同实例的server id应该是相同的)

在把SymmetricDS部署在Tomcat或JBoss这类应用服务器上时,不需要为应用服务器配置特殊的会话集群(session clustering)。

参考

基于SymmetricDS的多主一从数据库同步方案