当前位置: 首页>后端>正文

写入-clickhouse

写入的相关概念

  • block
  • 分区
  • part

block

block 说明

clickhouse block的概念在两个地方会存在。
一个是列式存储的时候,具有block的概念。
另一个是通过网络传输写入数据时,会将数据分成一个又一个block。

part

一个part 实质就是一个磁盘上的目录,目录之中存放相关的文件。
目录的命名格式:

 分区名_最小block号_最大block号_level 

写入 与 part 的关系

每次写入一条数据, 一次clickhouse写入,会生成一个part。
每次写入多条数据,一次写入所生成的part数目 取决于 所写入的数据是否存在于表之中的多个分区,以及具体所写入的数据量。
这些分散的part 最后会在clickhouse的merge 机制下 被合并。

更细节地说:
一次clickhouse写入,写入多条数据的时候,clickhouse 会对所写入的数据按照分区进行划分,划分后的数据则进行block的划分。
block的划分规则由参数
max_insert_block_size
min_insert_block_size_rows
决定。
一个block对应一个part。

如果每次所写入的数据,分区之中的数据行数超过 max_block_size,或者 数据量 超过min_insert_block_size_bytes,那么这些数据被切分成为多个block,也就是多个part。

适合 clickhouse的 数据写入方式

clickhouse 适合批量写入数据,而不适合一条一条写入数据。
批量写入的数据最好都是同一个表的同一个分区下的。
最本质的目的是 使因为每次写入而产生的part数目尽可能地少,所以写入的优化尽量往这个层次靠。

clickhouse 写入数据的方式

  • 1:代码使用驱动写入。
  • 2:clickhouse-client 执行 insert 语句写入。
    注意:
    go 的 clickhouse写入驱动 通过 tcp协议 写入数据。
    java 的 clickhouse写入驱动 通过 http协议 写入数据。

可复制表 写入去重

注意:只有对于复制表系列才有写入去重机制,并不是所有的表都有写入去重机制的。

写入去重的相关参数

insert_deduplicate : 表示是否开启写入去重机制。
replicated_deduplication_window:可复制表在zookeeer的clickhouse路径的 block目录下,会存储多少个节点。
replicated_deduplication_window_seconds:zookeeper的clickhouse路径的 block目录下,节点存活时间长度

写入去重机制的背后实现原理

可复制表的写入去重依赖于 zookeeper。
clickhouse 在zookeeper 上的默认存储路径为 /clickhouse 。
对于可复制表的写入,每次写入,将所写入的数据按照规则划分为一个个block,对每一个block计算一个校验和,将校验和存储在zookeeper的 /clickhouse/tables/分片号/数据名/表名/blocks 路径下(这个路径 和 可复制表的参数也有关)的一个节点上。
每次写入的时候,比较数据块的校验和 和 已有的数据块校验和关系,用以判断写入数据是否重复。

注意:这里的 blocks目录 是和 分片存在关系的,不同的分片所记录的校验和是不同的。如果先将数据写入分片1,再将相同的数据写入分片2,那么是无法去重的,相同的数据写入同一个分片,去重功能才会生效。

clickhouse 写入速度限制

  • max_execution_speed:每秒最多操作的行数
  • min_execution_speed:每秒最少操作的行数
  • max_execution_speed_bytes:每秒最多操作的字节数
  • min_execution_speed_bytes: 每秒最少操作的字节数
  • timeout_before_checking_execution_speed:经过一段时候后检查执行的速度,这个参数设置为开始检查速度前的时间间隔
    注意:限制速度的参数 和 timeout_before_checking_execution_speed 参数结合起来使用 才能限制SQL执行的速度。

clickhouse 部分写入的参数

  • max_parts_in_total: 一个表之中最多有多少活跃的part
  • parts_to_throw_insert: 表分区之中活跃part数目超过多少 会抛出异常
  • parts_to_delay_insert:表分区之中活跃part数目超过多少 会导致延迟写入
  • inactive_parts_to_throw_insert:表分区之中不活跃的part数目超过多少,会抛出异常
  • inactive_parts_to_delay_insert:表分区之中不活跃的part数目超过多少,会延迟写入

写入监控

写入监控指标

  • 一段时间内集群写入次数
  • 一段时间内集群写入的数据量
  • 一段时间内集群写入的行数
  • 一段时间内 发生写入的数据库数目
  • 一段时间内 发生写入的表的数目
  • 一段时间内 每个数据库的 写入次数 写入数据量 写入行数
  • 一段时间内 每个表的
    写入次数 写入成功次数 写入失败次数 写入成功数据量 写入成功行数 平均每次写入的行数 平均每次写入的行数 平均每行的大小 平均每次写入耗时
  • 写入失败次数
  • 延迟写入次数
  • too many parts 而导致的写入失败次数
  • 写入健康监控

一段时间内 集群写入次数 写入数据量 写入行数

select 
       sum(if(`type` =1,1, 0))  as cnt , // 写入次数 
       sum(if(`type` =2, written_bytes, 0)) as written_bytes, //写入字节数 
       sum(if(`type` =2, written_rows, 0)) as written_rows // 写入行数
from
   clusterAllReplicas('集群名', 'system.query_log')
where  
      event_date >= '' // 起始时间
      and event_date <= '' // 结束时间
      and event_time >= '' // 起始时间
      and event_time <= '' // 结束时间 
      and query_kind = 'Insert'  and tables[1] <> ''
      and (type = 1 or type = 2)

一段时间内 集群发生写入的数据库数目

select 
   count(*) as database_happen_write_num 
from (
select      
    databases[1] as database 
from    
   clusterAllReplicas('集群名', 'system.query_log')
 where   
       event_date >= ''         // 开始日期 
       and event_date <= ''  // 结束日期
       and event_time >= ''  // 开始时间 
       and event_time <= ''  // 结束时间 
       and query_kind = 'Insert'  
       and tables[1] <> ''
     group by databases[1]
) 

一段时间内 集群发生写入的表的数目

select 
   count(*) as table_happen_write_cnt
from (
select   
     databases[1] as dbname,
      tables[1] as tablename 
from    
        clusterAllReplicas('集群名','system.query_log')
 where   
       event_date >= '' // 开始日期
       and event_date <= '' // 结束日期 
       and event_time >= '' // 开始时间 
       and event_time <= '' // 结束时间 
       and query_kind = 'Insert'  
       and tables[1] <> ''
  group by 
   databases[1], tables[1]
) 

一段时间内 每个数据库的 写入次数 写入数据量 写入行数

  select  
    databases[1] as dbName, 
     sum(if(`type` =1,1, 0))as write_cnt, // 写入次数
     sum(if(`type` =2,written_bytes, 0)) as written_bytes, // 写入数据量
     sum(if(`type` =2,written_rows, 0)) as written_rows // 写入行数 
  from    
     clusterAllReplicas('集群名','system.query_log')
  where   
    event_date >= '' // 起始日期
    and event_date <= '' // 结束日期
    and event_time >= '' // 起始时间
    and event_time <= '' // 结束时间
    and query_kind = 'Insert'  
    and tables[1] <> '' 
    and ( `type`=1 or `type` = 2)
   group by   databases[1]

一段时间内 每个表的 写入次数 写入失败次数 写入行数 写入数据量 平均每次写入数据量以及行数 平均每行的大小

select 
   tableName,
   write_cnt, // 写入次数  
   write_success_cnt, // 写入成功次数
   write_fail_cnt,   // 写入失败次数 
   write_success_rows, // 写入成功行数 
   write_fail_rows, // 写入失败行数 
   write_success_bytes, // 成功写入数据量 
   write_fail_bytes, // 写入失败数据量 
   divide(write_success_bytes, write_success_cnt) as avg_write_bytes_per, // 平均每次写入数据量 
  divide(write_success_bytes, write_success_rows) as avg_write_rows_per // 平均每行的大小
from (
   select    
      tables[1] as tableName,
      sum(if( `type` = 1 , 1, 0)) as write_cnt, 
      sum(if( `type` = 2 , 1, 0)) as write_success_cnt, 
      sum(if(`type` =2 or `type` = 1 , 0, 1)) as write_fail_cnt,
      sum(if(`type` =2  , written_rows, 0))  as write_success_rows, 
      sum(if(`type` =2 or `type` = 1 , 0, written_rows )) as write_fail_rows, 
      sum(if( `type` = 2 , written_bytes, 0)) as write_success_bytes, 
      sum(if( `type` =2 or `type` = 1 ,0, written_bytes)) as write_fail_bytes 
   from  
      clusterAllReplicas('集群名','system.query_log')
   where  
      event_date >= '' // 起始日期 
      and event_date <= '' // 结束日期
      and event_time >= '' // 开始时间
      and event_time <= '' // 结束时间 
      and query_kind = 'Insert'  
      and tables[1] <> '' 
group by 
     tables[1] 
) 

一段时间内 每个表的平均写入延迟

select 
   dbName,
   table_name,
   divide(write_consume_time, write_success_cnt) as avg_write_consume_time 
from (
select  
  databases[1] as dbName,
  tables[1] as table_name,   
   count(*) as write_success_cnt,   
   sum(query_duration_ms) as write_consume_time
from  
    clusterAllReplicas('集群名','system.query_log')
where 
    event_date >= toDate('')      // 起始日期
    and event_date <= toDate('')  // 结束日期
    and event_time >= '' // 起始时间
    and event_time <= '' // 结束时间 
    and query_kind = 'Insert'  
    and tables[1] <> '' 
    and `type` = 2
group by     
      databases[1],    
      tables[1],
) 

写入失败的次数

select * from system.events where event = 'FailedInsertQuery';

注意:指标的类型是累加的。

获取最近写入失败的SQL信息

select 
  hostName() as hostName,
  query,
  query_id,
  exception_code,
  exception,
  stack_trace
from 
  clusterAllReplicas('default', 'system.query_log')
where 
   query_start_time >= ''      // 填入开始时间
   and query_start_time <= '' //填入结束时间
   and type <> 1 
   and type <> 2
   and query_kind = 'Insert' 
order by query_start_time desc 
limit 30;

写入耗时监控

select * from system.events where event = 'InsertQueryTimeMicroseconds';

注意:指标的类型是累加,sql只是获得单节点的数据。

获取最近写入耗时很长的具体SQL

select 
  hostName() as hostName, 
  databases[1] as dbName,
  tables[1] as tableName,
  written_rows,
  written_bytes,
  memory_usage 
from 
clusterAllReplicas('default', 'system.query_log')
where 
   query_start_time >= ''  // 填充起始时间
   and query_start_time <= ''  // 填充结束时间 
   and query_duration_ms > 时间域值 // 填充所认为的时间长度,单位是毫秒
   and type = 2
   and query_kind = 'Insert'
order by query_duration_ms desc 
limit 40

延迟写入的次数

 select * from system.events where event = 'DelayedInserts'

注意:指标是累加值,给出的sql是单个节点上的数据。

因为 too many parts 而导致的写入失败次数监控

select * from system.events where event = 'RejectedInserts'

注意:指标是累加值,给出的sql是单个节点上的数据。

复制表的写入 data loss 监控

当复制表的 inser_quorm 的配置参数设置为0 或者1 的时候,向复制表的某个副本写入数据,
所写入的数据尚未同步到另一个表就丢失了。指标监控的是这种类型的数据丢失。

select * from system.events where event = 'ReplicatedDataLoss'

注意: system.events 之中的指标是累加值。

表写入健康监控

注意:对于分区表而言,如果每次写入的数据包含多个分区的数据,那么数据的写入是不健康的。当表的类型是合并树时,会造成合并的压力过大。

查询当天的每个写入sql 所生成的 part数目,来判断上游写入数据的质量。

select query_id,count(*) as num 
from (
   select query_id,partition_id 
      from 
        clusterAllReplicas('default', 'system.part_log') 
      where  event_type = 1 
        and database = '数据库名'
        and table = '表名'
        and event_date = toDate(now())
        and event_time >= toDateTime('自己填充时间')
      group by query_id, partition_id
)
group by query_id 
 order by num desc limit 1;

注意:如果出现多条记录的话,极大的可能是写入的数据质量不是很好,需要上游的数据源调整。最终的目的还是希望一次写入的数据只能是属于一个分区的。


https://www.xamrdz.com/backend/3et1931696.html

相关文章: