写入的相关概念
- block
- 表
- 分区
- part
block
block 说明
clickhouse block的概念在两个地方会存在。
一个是列式存储的时候,具有block的概念。
另一个是通过网络传输写入数据时,会将数据分成一个又一个block。
part
一个part 实质就是一个磁盘上的目录,目录之中存放相关的文件。
目录的命名格式:
分区名_最小block号_最大block号_level
写入 与 part 的关系
每次写入一条数据, 一次clickhouse写入,会生成一个part。
每次写入多条数据,一次写入所生成的part数目 取决于 所写入的数据是否存在于表之中的多个分区,以及具体所写入的数据量。
这些分散的part 最后会在clickhouse的merge 机制下 被合并。
更细节地说:
一次clickhouse写入,写入多条数据的时候,clickhouse 会对所写入的数据按照分区进行划分,划分后的数据则进行block的划分。
block的划分规则由参数
max_insert_block_size
min_insert_block_size_rows
决定。
一个block对应一个part。
如果每次所写入的数据,分区之中的数据行数超过 max_block_size,或者 数据量 超过min_insert_block_size_bytes,那么这些数据被切分成为多个block,也就是多个part。
适合 clickhouse的 数据写入方式
clickhouse 适合批量写入数据,而不适合一条一条写入数据。
批量写入的数据最好都是同一个表的同一个分区下的。
最本质的目的是 使因为每次写入而产生的part数目尽可能地少,所以写入的优化尽量往这个层次靠。
clickhouse 写入数据的方式
- 1:代码使用驱动写入。
- 2:clickhouse-client 执行 insert 语句写入。
注意:
go 的 clickhouse写入驱动 通过 tcp协议 写入数据。
java 的 clickhouse写入驱动 通过 http协议 写入数据。
可复制表 写入去重
注意:只有对于复制表系列才有写入去重机制,并不是所有的表都有写入去重机制的。
写入去重的相关参数
insert_deduplicate : 表示是否开启写入去重机制。
replicated_deduplication_window:可复制表在zookeeer的clickhouse路径的 block目录下,会存储多少个节点。
replicated_deduplication_window_seconds:zookeeper的clickhouse路径的 block目录下,节点存活时间长度
写入去重机制的背后实现原理
可复制表的写入去重依赖于 zookeeper。
clickhouse 在zookeeper 上的默认存储路径为 /clickhouse 。
对于可复制表的写入,每次写入,将所写入的数据按照规则划分为一个个block,对每一个block计算一个校验和,将校验和存储在zookeeper的 /clickhouse/tables/分片号/数据库名/表名/blocks 路径下(这个路径 和 可复制表的参数也有关)的一个节点上。
每次写入的时候,比较数据块的校验和 和 已有的数据块校验和关系,用以判断写入数据是否重复。
注意:这里的 blocks目录 是和 分片存在关系的,不同的分片所记录的校验和是不同的。如果先将数据写入分片1,再将相同的数据写入分片2,那么是无法去重的,相同的数据写入同一个分片,去重功能才会生效。
clickhouse 写入速度限制
- max_execution_speed:每秒最多操作的行数
- min_execution_speed:每秒最少操作的行数
- max_execution_speed_bytes:每秒最多操作的字节数
- min_execution_speed_bytes: 每秒最少操作的字节数
- timeout_before_checking_execution_speed:经过一段时候后检查执行的速度,这个参数设置为开始检查速度前的时间间隔
注意:限制速度的参数 和 timeout_before_checking_execution_speed 参数结合起来使用 才能限制SQL执行的速度。
clickhouse 部分写入的参数
- max_parts_in_total: 一个表之中最多有多少活跃的part
- parts_to_throw_insert: 表分区之中活跃part数目超过多少 会抛出异常
- parts_to_delay_insert:表分区之中活跃part数目超过多少 会导致延迟写入
- inactive_parts_to_throw_insert:表分区之中不活跃的part数目超过多少,会抛出异常
- inactive_parts_to_delay_insert:表分区之中不活跃的part数目超过多少,会延迟写入
写入监控
写入监控指标
- 一段时间内集群写入次数
- 一段时间内集群写入的数据量
- 一段时间内集群写入的行数
- 一段时间内 发生写入的数据库数目
- 一段时间内 发生写入的表的数目
- 一段时间内 每个数据库的 写入次数 写入数据量 写入行数
- 一段时间内 每个表的
写入次数 写入成功次数 写入失败次数 写入成功数据量 写入成功行数 平均每次写入的行数 平均每次写入的行数 平均每行的大小 平均每次写入耗时 - 写入失败次数
- 延迟写入次数
- too many parts 而导致的写入失败次数
- 写入健康监控
一段时间内 集群写入次数 写入数据量 写入行数
select
sum(if(`type` =1,1, 0)) as cnt , // 写入次数
sum(if(`type` =2, written_bytes, 0)) as written_bytes, //写入字节数
sum(if(`type` =2, written_rows, 0)) as written_rows // 写入行数
from
clusterAllReplicas('集群名', 'system.query_log')
where
event_date >= '' // 起始时间
and event_date <= '' // 结束时间
and event_time >= '' // 起始时间
and event_time <= '' // 结束时间
and query_kind = 'Insert' and tables[1] <> ''
and (type = 1 or type = 2)
一段时间内 集群发生写入的数据库数目
select
count(*) as database_happen_write_num
from (
select
databases[1] as database
from
clusterAllReplicas('集群名', 'system.query_log')
where
event_date >= '' // 开始日期
and event_date <= '' // 结束日期
and event_time >= '' // 开始时间
and event_time <= '' // 结束时间
and query_kind = 'Insert'
and tables[1] <> ''
group by databases[1]
)
一段时间内 集群发生写入的表的数目
select
count(*) as table_happen_write_cnt
from (
select
databases[1] as dbname,
tables[1] as tablename
from
clusterAllReplicas('集群名','system.query_log')
where
event_date >= '' // 开始日期
and event_date <= '' // 结束日期
and event_time >= '' // 开始时间
and event_time <= '' // 结束时间
and query_kind = 'Insert'
and tables[1] <> ''
group by
databases[1], tables[1]
)
一段时间内 每个数据库的 写入次数 写入数据量 写入行数
select
databases[1] as dbName,
sum(if(`type` =1,1, 0))as write_cnt, // 写入次数
sum(if(`type` =2,written_bytes, 0)) as written_bytes, // 写入数据量
sum(if(`type` =2,written_rows, 0)) as written_rows // 写入行数
from
clusterAllReplicas('集群名','system.query_log')
where
event_date >= '' // 起始日期
and event_date <= '' // 结束日期
and event_time >= '' // 起始时间
and event_time <= '' // 结束时间
and query_kind = 'Insert'
and tables[1] <> ''
and ( `type`=1 or `type` = 2)
group by databases[1]
一段时间内 每个表的 写入次数 写入失败次数 写入行数 写入数据量 平均每次写入数据量以及行数 平均每行的大小
select
tableName,
write_cnt, // 写入次数
write_success_cnt, // 写入成功次数
write_fail_cnt, // 写入失败次数
write_success_rows, // 写入成功行数
write_fail_rows, // 写入失败行数
write_success_bytes, // 成功写入数据量
write_fail_bytes, // 写入失败数据量
divide(write_success_bytes, write_success_cnt) as avg_write_bytes_per, // 平均每次写入数据量
divide(write_success_bytes, write_success_rows) as avg_write_rows_per // 平均每行的大小
from (
select
tables[1] as tableName,
sum(if( `type` = 1 , 1, 0)) as write_cnt,
sum(if( `type` = 2 , 1, 0)) as write_success_cnt,
sum(if(`type` =2 or `type` = 1 , 0, 1)) as write_fail_cnt,
sum(if(`type` =2 , written_rows, 0)) as write_success_rows,
sum(if(`type` =2 or `type` = 1 , 0, written_rows )) as write_fail_rows,
sum(if( `type` = 2 , written_bytes, 0)) as write_success_bytes,
sum(if( `type` =2 or `type` = 1 ,0, written_bytes)) as write_fail_bytes
from
clusterAllReplicas('集群名','system.query_log')
where
event_date >= '' // 起始日期
and event_date <= '' // 结束日期
and event_time >= '' // 开始时间
and event_time <= '' // 结束时间
and query_kind = 'Insert'
and tables[1] <> ''
group by
tables[1]
)
一段时间内 每个表的平均写入延迟
select
dbName,
table_name,
divide(write_consume_time, write_success_cnt) as avg_write_consume_time
from (
select
databases[1] as dbName,
tables[1] as table_name,
count(*) as write_success_cnt,
sum(query_duration_ms) as write_consume_time
from
clusterAllReplicas('集群名','system.query_log')
where
event_date >= toDate('') // 起始日期
and event_date <= toDate('') // 结束日期
and event_time >= '' // 起始时间
and event_time <= '' // 结束时间
and query_kind = 'Insert'
and tables[1] <> ''
and `type` = 2
group by
databases[1],
tables[1],
)
写入失败的次数
select * from system.events where event = 'FailedInsertQuery';
注意:指标的类型是累加的。
获取最近写入失败的SQL信息
select
hostName() as hostName,
query,
query_id,
exception_code,
exception,
stack_trace
from
clusterAllReplicas('default', 'system.query_log')
where
query_start_time >= '' // 填入开始时间
and query_start_time <= '' //填入结束时间
and type <> 1
and type <> 2
and query_kind = 'Insert'
order by query_start_time desc
limit 30;
写入耗时监控
select * from system.events where event = 'InsertQueryTimeMicroseconds';
注意:指标的类型是累加,sql只是获得单节点的数据。
获取最近写入耗时很长的具体SQL
select
hostName() as hostName,
databases[1] as dbName,
tables[1] as tableName,
written_rows,
written_bytes,
memory_usage
from
clusterAllReplicas('default', 'system.query_log')
where
query_start_time >= '' // 填充起始时间
and query_start_time <= '' // 填充结束时间
and query_duration_ms > 时间域值 // 填充所认为的时间长度,单位是毫秒
and type = 2
and query_kind = 'Insert'
order by query_duration_ms desc
limit 40
延迟写入的次数
select * from system.events where event = 'DelayedInserts'
注意:指标是累加值,给出的sql是单个节点上的数据。
因为 too many parts 而导致的写入失败次数监控
select * from system.events where event = 'RejectedInserts'
注意:指标是累加值,给出的sql是单个节点上的数据。
复制表的写入 data loss 监控
当复制表的 inser_quorm 的配置参数设置为0 或者1 的时候,向复制表的某个副本写入数据,
所写入的数据尚未同步到另一个表就丢失了。指标监控的是这种类型的数据丢失。
select * from system.events where event = 'ReplicatedDataLoss'
注意: system.events 之中的指标是累加值。
表写入健康监控
注意:对于分区表而言,如果每次写入的数据包含多个分区的数据,那么数据的写入是不健康的。当表的类型是合并树时,会造成合并的压力过大。
查询当天的每个写入sql 所生成的 part数目,来判断上游写入数据的质量。
select query_id,count(*) as num
from (
select query_id,partition_id
from
clusterAllReplicas('default', 'system.part_log')
where event_type = 1
and database = '数据库名'
and table = '表名'
and event_date = toDate(now())
and event_time >= toDateTime('自己填充时间')
group by query_id, partition_id
)
group by query_id
order by num desc limit 1;
注意:如果出现多条记录的话,极大的可能是写入的数据质量不是很好,需要上游的数据源调整。最终的目的还是希望一次写入的数据只能是属于一个分区的。