Files
everything-claude-code/docs/zh-CN/skills/clickhouse-io/SKILL.md
2026-03-29 21:21:18 -04:00

446 lines
9.8 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
name: clickhouse-io
description: ClickHouse数据库模式、查询优化、分析以及高性能分析工作负载的数据工程最佳实践。
origin: ECC
---
# ClickHouse 分析模式
用于高性能分析和数据工程的 ClickHouse 特定模式。
## 何时激活
* 设计 ClickHouse 表架构MergeTree 引擎选择)
* 编写分析查询(聚合、窗口函数、连接)
* 优化查询性能(分区裁剪、投影、物化视图)
* 摄取大量数据批量插入、Kafka 集成)
* 为分析目的从 PostgreSQL/MySQL 迁移到 ClickHouse
* 实现实时仪表板或时间序列分析
## 概述
ClickHouse 是一个用于在线分析处理 (OLAP) 的列式数据库管理系统 (DBMS)。它针对大型数据集上的快速分析查询进行了优化。
**关键特性:**
* 列式存储
* 数据压缩
* 并行查询执行
* 分布式查询
* 实时分析
## 表设计模式
### MergeTree 引擎 (最常用)
```sql
CREATE TABLE markets_analytics (
date Date,
market_id String,
market_name String,
volume UInt64,
trades UInt32,
unique_traders UInt32,
avg_trade_size Float64,
created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;
```
### ReplacingMergeTree (去重)
```sql
-- For data that may have duplicates (e.g., from multiple sources)
CREATE TABLE user_events (
event_id String,
user_id String,
event_type String,
timestamp DateTime,
properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);
```
### AggregatingMergeTree (预聚合)
```sql
-- For maintaining aggregated metrics
CREATE TABLE market_stats_hourly (
hour DateTime,
market_id String,
total_volume AggregateFunction(sum, UInt64),
total_trades AggregateFunction(count, UInt32),
unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);
-- Query aggregated data
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;
```
## 查询优化模式
### 高效过滤
```sql
-- PASS: GOOD: Use indexed columns first
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
AND market_id = 'market-123'
AND volume > 1000
ORDER BY date DESC
LIMIT 100;
-- FAIL: BAD: Filter on non-indexed columns first
SELECT *
FROM markets_analytics
WHERE volume > 1000
AND market_name LIKE '%election%'
AND date >= '2025-01-01';
```
### 聚合
```sql
-- PASS: GOOD: Use ClickHouse-specific aggregation functions
SELECT
toStartOfDay(created_at) AS day,
market_id,
sum(volume) AS total_volume,
count() AS total_trades,
uniq(trader_id) AS unique_traders,
avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;
-- PASS: Use quantile for percentiles (more efficient than percentile)
SELECT
quantile(0.50)(trade_size) AS median,
quantile(0.95)(trade_size) AS p95,
quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;
```
### 窗口函数
```sql
-- Calculate running totals
SELECT
date,
market_id,
volume,
sum(volume) OVER (
PARTITION BY market_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;
```
## 数据插入模式
### 批量插入 (推荐)
```typescript
import { ClickHouse } from 'clickhouse'
const clickhouse = new ClickHouse({
url: process.env.CLICKHOUSE_URL,
port: 8123,
basicAuth: {
username: process.env.CLICKHOUSE_USER,
password: process.env.CLICKHOUSE_PASSWORD
}
})
// PASS: Batch insert (efficient)
async function bulkInsertTrades(trades: Trade[]) {
const values = trades.map(trade => `(
'${trade.id}',
'${trade.market_id}',
'${trade.user_id}',
${trade.amount},
'${trade.timestamp.toISOString()}'
)`).join(',')
await clickhouse.query(`
INSERT INTO trades (id, market_id, user_id, amount, timestamp)
VALUES ${values}
`).toPromise()
}
// FAIL: Individual inserts (slow)
async function insertTrade(trade: Trade) {
// Don't do this in a loop!
await clickhouse.query(`
INSERT INTO trades VALUES ('${trade.id}', ...)
`).toPromise()
}
```
### 流式插入
```typescript
// For continuous data ingestion
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'
async function streamInserts() {
const stream = clickhouse.insert('trades').stream()
for await (const batch of dataSource) {
stream.write(batch)
}
await stream.end()
}
```
## 物化视图
### 实时聚合
```sql
-- Create materialized view for hourly stats
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
toStartOfHour(timestamp) AS hour,
market_id,
sumState(amount) AS total_volume,
countState() AS total_trades,
uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;
-- Query the materialized view
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;
```
## 性能监控
### 查询性能
```sql
-- Check slow queries
SELECT
query_id,
user,
query,
query_duration_ms,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_duration_ms > 1000
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;
```
### 表统计信息
```sql
-- Check table sizes
SELECT
database,
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
```
## 常见分析查询
### 时间序列分析
```sql
-- Daily active users
SELECT
toDate(timestamp) AS date,
uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;
-- Retention analysis
SELECT
signup_date,
countIf(days_since_signup = 0) AS day_0,
countIf(days_since_signup = 1) AS day_1,
countIf(days_since_signup = 7) AS day_7,
countIf(days_since_signup = 30) AS day_30
FROM (
SELECT
user_id,
min(toDate(timestamp)) AS signup_date,
toDate(timestamp) AS activity_date,
dateDiff('day', signup_date, activity_date) AS days_since_signup
FROM events
GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;
```
### 漏斗分析
```sql
-- Conversion funnel
SELECT
countIf(step = 'viewed_market') AS viewed,
countIf(step = 'clicked_trade') AS clicked,
countIf(step = 'completed_trade') AS completed,
round(clicked / viewed * 100, 2) AS view_to_click_rate,
round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
SELECT
user_id,
session_id,
event_type AS step
FROM events
WHERE event_date = today()
)
GROUP BY session_id;
```
### 队列分析
```sql
-- User cohorts by signup month
SELECT
toStartOfMonth(signup_date) AS cohort,
toStartOfMonth(activity_date) AS month,
dateDiff('month', cohort, month) AS months_since_signup,
count(DISTINCT user_id) AS active_users
FROM (
SELECT
user_id,
min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
toDate(timestamp) AS activity_date
FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;
```
## 数据流水线模式
### ETL 模式
```typescript
// Extract, Transform, Load
async function etlPipeline() {
// 1. Extract from source
const rawData = await extractFromPostgres()
// 2. Transform
const transformed = rawData.map(row => ({
date: new Date(row.created_at).toISOString().split('T')[0],
market_id: row.market_slug,
volume: parseFloat(row.total_volume),
trades: parseInt(row.trade_count)
}))
// 3. Load to ClickHouse
await bulkInsertToClickHouse(transformed)
}
// Run periodically
setInterval(etlPipeline, 60 * 60 * 1000) // Every hour
```
### 变更数据捕获 (CDC)
```typescript
// Listen to PostgreSQL changes and sync to ClickHouse
import { Client } from 'pg'
const pgClient = new Client({ connectionString: process.env.DATABASE_URL })
pgClient.query('LISTEN market_updates')
pgClient.on('notification', async (msg) => {
const update = JSON.parse(msg.payload)
await clickhouse.insert('market_updates', [
{
market_id: update.id,
event_type: update.operation, // INSERT, UPDATE, DELETE
timestamp: new Date(),
data: JSON.stringify(update.new_data)
}
])
})
```
## 最佳实践
### 1. 分区策略
* 按时间分区 (通常是月或日)
* 避免过多分区 (影响性能)
* 对分区键使用 DATE 类型
### 2. 排序键
* 将最常过滤的列放在前面
* 考虑基数 (高基数优先)
* 排序影响压缩
### 3. 数据类型
* 使用最合适的较小类型 (UInt32 对比 UInt64)
* 对重复字符串使用 LowCardinality
* 对分类数据使用 Enum
### 4. 避免
* SELECT \* (指定列)
* FINAL (改为在查询前合并数据)
* 过多的 JOIN (分析场景下进行反规范化)
* 频繁的小批量插入 (改为批量)
### 5. 监控
* 跟踪查询性能
* 监控磁盘使用情况
* 检查合并操作
* 查看慢查询日志
**记住**: ClickHouse 擅长分析工作负载。根据查询模式设计表,批量插入,并利用物化视图进行实时聚合。