为了强化实时数仓的能力,便于将 MySQL 中的表映射到 ByteHouse 企业版中,ByteHouse 引入了MaterializedMySQL 数据库引擎,ByteHouse 服务作为 MySQL 副本,可以读取 Binlog 并执行 DDL 和 DML 请求,实现了基于 MySQL Binlog 机制的业务数据库实时同步功能,适用于需要将 MySQL 业务数据实时同步到 ByteHouse 企业版进行分析的场景。本文介绍通过 MaterializedMySQL 引擎将 MySQL 数据导入 ByteHouse 企业版的语法、操作步骤及注意事项。
ByteHouse 企业版在实现 MaterializedMySQL 时,底层引擎采用了自研的 HaUniqueMergeTree 引擎,支持自定义版本字段以及根据 UNIQUE KEY 实时删除数据功能,无需引入其他额外字段。同时,ByteHouse 增强了 MaterializedMySQL 引擎的稳定性和易用性。
相对于社区原生的 MaterializedMySQL 引擎,ByteHouse 中的 MaterializedMySQL 库引擎具有如下优势:
set force_manipulate_materialize_mysql_table = 1, distributed_ddl_entry_format_version = 2 ** 操作底表。 调整后,仅 ByteHouse MaterializedMySQL 引擎数据库中的表受影响,MySQL 源表不受影响。/clickhouse/[database_name]/[table_name]/UUID,replica 为{replica},UUID 是随机生成的。MaterializedMySQL 中的表默认以单副本形式存在,禁止对这些表创建其他副本以形成多副本,否则对其他新增副本的操作将会间接影响 MaterializedMySQL 中的表,可能导致数据一致性问题。mysql.user 表的 SELECT 权限在 ByteHouse 中新建库引擎为 MaterializedMySQL 的数据库,创建完成后,系统将自动读取源数据,语法如下:
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster] ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...] [TABLE OVERRIDE table_name ( [COLUMNS ( [name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], ...] [INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1, ...] [PROJECTION projection_name_1 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]), ...] )] [ORDER BY expr] [PRIMARY KEY expr] [PARTITION BY expr] [SAMPLE BY expr] [TTL expr] ), ...]
参数 | 是否必填 | 说明 |
|---|---|---|
IF NOT EXISTS | 否 | 判断新建的库是否已存在,如果
|
db_name | 是 | 自定义数据库名称。 |
ON CLUSTER | 否 | 指定集群名称,在指定集群的上创建数据库。 |
host:port | 是 | MySQL 数据库的 URL 和端口号。 |
database | 是 | MySQL 数据库名称。 |
user | 是 | MySQL 数据库账号。 |
password | 是 | MySQL 数据库账号密码。 |
SETTINGS | 否 | 请参见Settings 参数说明。 |
TABLE OVERRIDE | 否 |
您可按需配置以下 settings 参数:
设置 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_rows_in_buffer | UInt64 | 65505 | 单个表允许在内存中缓存的数据最大行数(缓存数据无法查询)。 当超过该数字时,数据将被物化,持久化写入存储。 默认值:65505。 |
max_bytes_in_buffer | UInt64 | 1048576 | 单个表允许在内存中缓存的数据最大字节数(缓存数据无法查询)。 当超过该数字时,数据将被物化,持久化写入存储。 默认值:1048576。 |
max_flush_data_time | UInt64 | 1000 | 数据库级允许数据在内存中缓存的最大时长,单位为毫秒(缓存数据无法查询)。 超过该时长时,数据将被物化,持久化写入存储。 默认值:1000。 |
max_wait_time_when_mysql_unavailable | Int64 | 1000 | MySQL 不可用时的重试间隔,单位为毫秒。负值表示禁用重试,设置为 0 表示立即重试,默认值:1000。 |
allows_query_when_mysql_lost | Bool | true | MySQL 连接断开时,是否允许查询物化表。支持设置为 true(允许)或 false(不允许),默认值:true。 |
skip_error_count | Int64 | 0 | 在 MaterializedMySQL 数据同步过程中,跳过 DML 和 DDL 错误的数量。负值表示跳过所有错误,其他数值表示跳过指定数量的错误。默认值:0。 |
include_tables | String | '' | 如果配置了该参数,则系统仅同步符合条件的表。 表名称以逗号分隔,支持正则表达式。 |
exclude_tables | String | '' | 如果配置了该参数,所有符合条件的表将不被同步。 表名称以逗号分隔,支持正则表达式。 |
skip_ddl_patterns | String | '' | 如果配置了该参数,则所有符合条件的 DDL 查询都不会被执行。 DDL 模式以逗号分隔,支持正则表达式并且不区分大小写。 如果该参数未设置或为空,则将执行所有 DDL 查询。 |
skip_unsupported_ddl | Bool | true | 是否跳过除 CREATE/DROP/RENAME/TRUNCATE TABLE 外的不支持的 DDL 操作。当前 ByteHouse MaterializedMySQL 支持的 DDL 操作请参见下文DDL。 |
skip_unsupported_tables | Bool | true | 是否跳过不支持的表,例如没有主键的表。 |
skip_sync_failed_tables | Bool | false | 是否跳过同步失败的表,以免阻止整个同步过程。 |
resync_table_task_schedule_time_ms | UInt64 | 1000 | 重新同步表任务的调度时间间隔,单位为毫秒。 |
resync_table_task_fail_retry_time | UInt64 | 5 | 重新同步表失败时的重试次数。 |
max_insert_wait_seconds_for_unique_table_leader | UInt64 | 30 | 唯一键表(unique 表)成为主节点(leader)的最大等待时间,单位为秒。 |
shard_mode | Bool | false | MaterializedMySQL 数据库是否以分布式方式同步 MySQL 数据到 ByteHouse。支持设置为 false 或 true,您也可设置为 0(对应 false) 或 1(对应 true),系统将对应转换。该值在创建库后不可调整。 |
zookeeper_session_expiration_check_period_s | UInt64 | 60 | 分片模式下 ZooKeeper 会话过期检查周期,单位为秒。 |
execute_dml_fail_max_retry_timeout_s | UInt64 | 600 | 执行 DML 操作失败时重试超时,单位为秒。当前 ByteHouse MaterializedMySQL 支持的 DML 操作请参见下文DML。 |
retry_execute_dml_sleep_ms | UInt64 | 2000 | 重试执行 DML 命令之间的等待时间,单位为毫秒。 |
inner_query_distributed_ddl_task_timeout | UInt64 | 30 | 集群中所有主机响应 DDL 查询的超时时间。 |
max_concurrent_resync_task_num | UInt64 | 5 | 数据库级允许并发执行的重新同步(resync)任务的最大数量。 |
resync_table_per_task | UInt64 | 5 | 单个 MaterializedMySQLResyncTask(重新同步任务)中包含的最大重新同步表数量。每次调度时,将从 MaterializedMySQL 数据库中选择 |
parallel_in_resync_task | Bool | false | 同一重新同步任务中,多个表的全量同步是否可以并行。 |
resync_position_diff | UInt64 | 15000 | 重新同步任务结束或合并到主同步线程时的位置差值(用于同步进度对齐)。 |
merge_resync_task_sleep_ms | UInt64 | 1000 | 将重新同步任务合并到主同步线程时的等待时间间隔(单位:毫秒)。 |
表重写(TABLE OVERRIDE)可用于自定义 ByteHouse DDL 查询,在无需调整 MySQL 源数据的前提下,调整 ByteHouse 中 MaterializedMySQL 引擎数据库内的表结构规则。
当 MySQL 源数据的表结构、分区方式不符合 ByteHouse 性能需求时,您可以在创建 ByteHouse MaterializedMySQL 数据库时使用 TABLE OVERRIDE 针对性修改某些表的规则。该操作不会影响 MySQL 源数据,也无需修改所有同步表,您可按需指定目标表并配置规则。
该功能常见的使用场景为控制分区,例如 MySQL 源数据未对数据进行分区,同步到 ByteHouse 后想按时间分区查询效率,可使用 TABLE OVERRIDE...PARTITION BY 指定分区,提升查询性能。
您可以对 MaterializedMySQL 表重写的模式转换进行以下操作:
注意
若使用不当,表重写将会导致 MySQL 与 ByteHouse 的数据同步中断。例如:
ALTER TABLE语句,此时该语句会失败,因为该字段已存在,同时阻断后续数据同步。ORDER BY) 或 分区键(PARTITION BY),但实际 ByteHouse 排序键和分区键要求使用非空字段,同步后数据查询将会出现问题。CREATE DATABASE demo_db_mysql ENGINE = MaterializeMySQL('localhost:3306', 'db', 'user', '***') SETTINGS parallel_in_resync_task = true, shard_mode = 1 TABLE OVERRIDE table1 ( COLUMNS ( userid UUID, category LowCardinality(String), timestamp DateTime CODEC(Delta, Default) ) PARTITION BY toYear(timestamp) ), TABLE OVERRIDE table2 ( COLUMNS ( ip_hash UInt32 MATERIALIZED xxHash32(client_ip), client_ip String TTL created + INTERVAL 72 HOUR ) SAMPLE BY ip_hash );
ByteHouse MaterializedMySQL 引擎数据库创建完成后,您可通过 ALTER 语句修改数据库的配置参数。
语法:
ALTER DATABASE db_name ON CLUSTER cluster_name MODIFY SETTING setting_name=value [, ...]
示例:
ALTER DATABASE demo_db_mysql ON CLUSTER bytehouse_demo_cluster MODIFY SETTING exclude_tables = 'table1,table2';
注意
include_tables 和 exclude_tables。shard_mode 参数。ByteHouse 支持您手动启动/停止/重启库同步任务。
语法:
system start/stop/restart sync materialize mysql on cluster cluster_name db
示例 1:手动启动同步任务
system start sync materialize mysql on cluster bytehouse_demo_cluster demo_db_mysql
示例 2:停止同步任务
system stop sync materialize mysql on cluster bytehouse_demo_cluster demo_db_mysql
示例 3:重启同步任务
system restart sync materialize mysql on cluster bytehouse_demo_cluster demo_db_mysql
ByteHouse 支持重新开始和取消重新开始全量同步表。如果同步过程中出现表异常,任务状态进入 sync_failed,您可使用执行表同步命令,恢复该表的同步流程。ByteHouse MaterializedMySQL 引擎会维护一份待重新同步表(resync table)的列表,并有一个后台线程来异步执行重新同步过程。您可通过system.materialize_mysql_status 系统表查询同步失败的表和待重新同步表的列表,该系统表的查询方式请参见system.materialize_mysql_status。
注意
执行此命令时,处于 FullSync 状态的表将会跳过此命令。
语法:
system [stop] resync materialize mysql table on cluster cluster_name db.table
示例 1:取消同步
system stop resync materialize mysql table on cluster bytehouse_demo_cluster demo_db_mysql.demo_table
示例 2:重新同步
system resync materialize mysql table on cluster bytehouse_demo_cluster demo_db_mysql.demo_table
MySQL 数据库端需确保已配置以下参数,确保已开启 Binlog,binlog_format 为 ROW 格式,开启 GTID 模式(全局事务 ID)。
登录 MySQL 并查看是否开启 Binlog 日志。
[root@node1 ~]# mysql -u root -password mysql> show variables like 'log_%';
如果查询结果 log_bin 字段为 off 则说明没有开启 Binlog 日志。
参数 | 参数值 |
|---|---|
log_bin | ON |
如果 binlog 已开启,请确认 binlog_format 和 binlog_row_image 为如下配置:
mysql> show variables like 'binlog_format'; mysql> show variables like 'binlog_row_image';
参数 | 参数值 |
|---|---|
binlog_format | ROW |
binlog_row_image | FULL |
查看默认的认证插件设置是否为 mysql_native_password。
mysql> show variables like 'default_authentication_plugin';
参数 | 参数值 |
|---|---|
default_authentication_plugin | mysql_native_password |
确认是否开启 GTID 模式。
mysql> show variables like 'gtid_mode'; mysql> show variables like 'enforce_gtid_consistency';
参数 | 参数值 |
|---|---|
gtid_mode | ON |
enforce_gtid_consistency | ON |
如果以上参数均未配置,可参考如下配置样例设置参数:
在 MySQL 的 /etc/my.cnf 文件中 [mysqld] 下写入以下配置。
[mysqld] # 指定一个不重名的 server-id server-id=123 # 配置 Binlog 的日志目录 log-bin=/var/lib/mysql/mysql-bin # 设置 Binlog 的格式为 Row binlog_format=ROW binlog_row_image=FULL # 设置默认的认证插件为 mysql_native_password default_authentication_plugin=mysql_native_password # 开启 GTID 模式 gtid_mode=on enforce_gtid_consistency=1
配置完成后,重启 MySQL 服务,可以逐步执行以上查询命令,查看 Binlog 日志及其他参数的配置是否满足要求。
[root@node1 ~]# service mysqld restart [root@node1 ~]# mysql -u root -password mysql> show variables like 'log_%'; mysql> show variables like 'binlog_format'; mysql> show variables like 'binlog_row_image'; mysql> show variables like 'default_authentication_plugin'; mysql> show variables like 'gtid_mode'; mysql> show variables like 'enforce_gtid_consistency';
在 MySQL 中执行以下 SQL,创建 MySQL 源库、表。
mysql> CREATE DATABASE mysql_source; mysql> CREATE TABLE mysql_source.test (a INT PRIMARY KEY, b INT);
在 ByteHouse 中创建 MaterializedMySQL 数据库并查询库,此时可查询到已同步 MySQL 源库中的表至 ByteHouse。使用时,请使用实际参数值替换命令示例中的占位符,参数配置说明请参见参数说明。
CREATE DATABASE demo_db_mysql ENGINE = MaterializeMySQL('192.xx.xx.xxx:3306', 'mysql_source', 'user_name', 'password') SETTINGS shard_mode = 1; SHOW TABLES FROM demo_db_mysql; 查询结果: ┌─name─┐ │ test │ └──────┘
测试创建表和插入数据
在 MySQL 中创建表,并插入数据:
mysql> INSERT INTO mysql_source.test VALUES (1, 11), (2, 22);
在 ByteHouse 中查询同步情况,结果与 MySQL 源表的查询结果一致。
SELECT * FROM demo_db_mysql.test; 查询结果: ┌─a─┬──b─┐ │ 1 │ 11 │ │ 2 │ 22 │ └───┴────┘
测试删除数据、添加列并更新数据
在 MySQL 源表中删除数据、添加列并更新数据。
mysql> DELETE FROM mysql_source.test WHERE a=1; mysql> ALTER TABLE mysql_source.test ADD COLUMN c VARCHAR(16); mysql> UPDATE mysql_source.test SET c='Wow!', b=222; mysql> SELECT * FROM mysql_source.test; 查询结果: +---+------+------+ | a | b | c | +---+------+------+ | 2 | 222 | Wow! | +---+------+------+
在 ByteHouse 中查询同步情况,结果与 MySQL 源表的查询结果一致。
SELECT * FROM demo_db_mysql.test; 查询结果: ┌─a─┬───b─┬─c────┐ │ 2 │ 222 │ Wow! │ └───┴─────┴──────┘
您可通过查看系统表,了解 MaterializedMySQL 数据库的同步信息、同步任务状态、重新同步任务状态、异常信息、日志等,协助排障,详情请参见下文相关参考章节:
所有数据类型均支持设置为可为空(nullable)。
MySQL | ByteHouse 企业版 |
|---|---|
TINY | Int8 |
SHORT,SMALLINT | Int16 |
MEDIUMINT,INT,INTEGER,INT32 | Int32 |
BIGINT | Int64 |
LONG | UInt32 |
LONGLONG | UInt64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL,NEWDECIMAL | Decimal |
DATE,NEWDATE | Date |
DATETIME,TIMESTAMP | DateTime |
DATETIME2,TIMESTAMP2 | DateTime64 |
ENUM | Enum |
STRING | String |
VARCHAR,VAR_STRING | String |
BLOB | String |
BINARY | FixedString |
MySQL DDL | ByteHouse 企业版 |
|---|---|
CREATE TABLE | ✔️ |
DROP TABLE | ✔️ |
RENAME TABLE | ✔️ |
ADD COLUMN | ✔️ |
DROP COLUMN | ✔️ |
RENAME COLUMN | ✔️(ByteHouse 企业版引擎版本需为 2.4及以上) |
MODIFY COLUMN | ✔️(ByteHouse 企业版引擎版本需为 2.4及以上) |
RENAME COLUMN 和 MODIFY COLUMN 依赖于 ByteHouse 企业版引擎版本,需为 2.4 及以上版本,您可登录 ByteHouse 企业版控制台,通过 集群管理 > 集群列表 > 集群名称 > 基本信息路径,查看引擎版本是否符合要求。
ByteHouse 企业版底层引擎采用 HaUniqueMergeTree 引擎,因此直接支持 INSERT、UPDATE 和 DELETE 操作。
您可使用该系统表查询 MaterialMySQL 数据库的状态。
SELECT * FROM system.materialize_mysql_status
分布式模式(shard_mode=1)下,只有 leader 节点可以查询到该信息,如需获取该系统表的信息需要执行以下 SQL:
SELECT * FROM CLUSTER('<cluster_name>', system.materialize_mysql_status, (1,2))
列名说明
列名 | 类型 | 说明 |
|---|---|---|
mysql_info | String | MySQL 连接信息,格式为 |
mysql_database | String | 被同步的 MySQL 库名。 |
database | String | 云数据库 ByteHouse 中同步的数据库名。 |
shard_mode | UInt8 | 是否为分布式模式。 |
cluster_name | String | 指定的集群名称。 |
zookeeper_path | String | ZooKeeper 路径。 |
sync_type | String | 同步状态,取值说明如下:
|
include_tables | Array | 同步包含的表名。 |
exclude_tables | Array | 同步排除的表名。 |
resync_tables | Array | 正在 resync 的表。 |
seconds_behind_mysql | Int64 | 和 MySQL 的主从延时,含义同 MySQL 的 Seconds_Behind_Master。当库处于非 IncrementSync 状态时值为 -1。 |
last_committed_position | String | 最后 commit 的 binlog 位置。 |
last_executed_event | String | 最后执行的 binlog event 内容。 |
total_position | String | MySQL 中 binlog 的最新位置。 |
error_count | UInt64 | 同步过程中出错的次数。 |
already_skip_errors | UInt64 | 已经跳过的错误数。 |
last_error_msg | String | 最后一次错误的信息。 |
last_error_time | DataTime | 最后一次错误的发生时间。 |
sync_failed_tables | Array | 同步失败的表。 |
skipped_unsupported_tables | Array | 已跳过的不支持的表。 |
您可使用该系统表查询 MaterialMySQL 数据库中数据同步任务的状态。
SELECT * FROM system.materialize_mysql_resync_task_status
分布式模式(shard_mode=1)下,只有 leader 节点可以查询到该信息,如需获取该系统表的信息需要执行以下 SQL:
SELECT * FROM CLUSTER('<cluster_name>', system.materialize_mysql_resync_task_status, (1,2))
列名说明
其中,部分列与 system.materialize_mysql_status 有相同的语义,但是指示范围不同。如 error_count:
system.materialize_mysql_status 中的 error_count 包含主 sync 线程与 resync 任务出错的数量;system.materialize_mysql_resync_task_status 中的 error_count 仅包含当前 resync 任务出错的数量;列名 | 类型 | 说明 |
|---|---|---|
mysql_info | String | MySQL 连接信息,格式为 |
mysql_database | String | 被同步的 MySQL 库名。 |
database | String | 云数据库 ByteHouse 中同步的数据库名。 |
shard_mode | UInt8 | 是否为分布式模式。 |
cluster_name | String | 指定的集群名称。 |
zookeeper_path | String | ZooKeeper 路径。 |
sync_type | String | 同步状态,取值说明如下:
|
last_committed_position | String | 最后 commit 的 binlog 位置。 |
last_executed_event | String | 最后执行的 binlog event 内容。 |
total_position | String | MySQL 中 binlog 的最新位置。 |
error_count | UInt64 | 同步过程中出错的次数。 |
already_skip_errors | UInt64 | 已经跳过的错误数。 |
last_error_msg | String | 最后一次错误的信息。 |
last_error_time | DataTime | 最后一次错误的发生时间。 |
resync_tables_in_task | Array | MaterializedMySQLResyncTask 任务中的待 resync table。 |
sync_failed_tables_in_task | Array | 任务中同步失败的表。 |
skipped_unsupported_tables_in_task | Array | 任务中已跳过的不支持的表。 |
您可以使用该系统表查询 MaterialMySQL 数据库中数据同步任务的日志。
使用前,需要在 config.xml 文件中配置如下,详情请参考配置集群参数。
<materialize_mysql_log> <database>system</database> <table>materialize_mysql_log</table> <partition_by>toYYYYMM(event_date)</partition_by> <flush_interval_milliseconds>7500</flush_interval_milliseconds> </materialize_mysql_log>
查询方式:
SELECT * FROM system.materialize_mysql_log ORDER BY event_time DESC limit 10;
分布式模式(shard_mode=1)下,log 只会记录在 leader 节点上,如需获取该系统表的信息需要执行以下 SQL:
SELECT * FROM CLUSTER('<cluster_name>', system.materialize_mysql_log, (1,2)) ORDER BY event_time DESC limit 10;
列名说明
列名 | 类型 | 说明 |
|---|---|---|
database | String | 云数据库 ByteHouse 中同步的数据库。 |
event_type | String | 日志类型,取值说明如下:
|
event_date | Date | 发生日期。 |
event_time | DateTime | 发生时间。 |
resync_table | String | resync 表名。 |
exception | String | 详细的异常信息。 |
exception_source | String | 异常信息来源,用于辅助判断
|