Skip to content

时序数据库 Clickhouse

ClickHouse 是一个开源的列式时序数据库管理系统(DBMS),专为高性能和低延迟的数据分析而设计。它最初由俄罗斯的互联网公司 Yandex 开发,用于处理海量的数据分析工作负载。以下是 ClickHouse 的主要特点和介绍:

  1. 列式存储:ClickHouse 采用列式存储,这意味着它将数据按列存储在磁盘上,而不是按行存储。这种存储方式对于数据分析非常高效,因为它允许查询只读取所需的列,而不必读取整个行。这导致了更快的查询性能和更小的存储空间占用。

  2. 分布式架构:ClickHouse 具有分布式架构,可以轻松扩展以处理大规模数据集。它支持数据分片、分布式复制和负载均衡,以确保高可用性和容错性。

  3. 支持 SQL 查询:ClickHouse 支持标准的 SQL 查询语言,使用户可以使用熟悉的查询语法执行数据分析操作。它还支持复杂的查询,如聚合、窗口函数和子查询。

  4. 高性能:ClickHouse 以查询性能和吞吐量为重点进行了优化。它专为快速的数据分析查询而设计,可以在毫秒级别内处理数十亿行的数据。

  5. 实时数据注入:ClickHouse 支持实时数据注入,允许将新数据迅速插入到表中,并能够在不停机的情况下进行数据更新。

  6. 支持多种数据格式:ClickHouse 可以处理多种数据格式,包括 JSON、CSV、Parquet 等,使其能够与各种数据源无缝集成。

  7. 可扩展性:ClickHouse 具有可扩展性,可以与其他工具和框架(如 Apache Kafka、Spark、Presto)集成,以满足各种数据处理需求。

  8. 开源和活跃的社区:ClickHouse 是一个开源项目,拥有活跃的社区支持。这意味着你可以免费获取并使用它,并且有一个庞大的开发者社区,提供了大量的文档和资源。

ClickHouse 在大数据分析、日志处理、事件追踪、时序数据分析等场景中得到了广泛的应用。它的高性能、可扩展性和强大的查询功能使其成为处理大规模数据的理想选择。如果你需要处理大量时序数据并进行快速数据分析,那么 ClickHouse 可能是一个非常有价值的数据库管理系统。

24.1 安装和配置ClickHouse数据库

24.1.1 安装

在Ubuntu上安装ClickHouse

  1. 打开终端并更新包列表:

    Terminal window
    sudo apt update
  2. 安装ClickHouse的APT存储库:

    Terminal window
    sudo apt install apt-transport-https ca-certificates dirmngr
    sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
    echo "deb https://repo.clickhouse.tech/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
  3. 再次更新包列表以获取ClickHouse包:

    Terminal window
    sudo apt update
  4. 安装ClickHouse Server:

    Terminal window
    sudo apt install clickhouse-server
  5. 启动ClickHouse服务:

    Terminal window
    sudo service clickhouse-server start
  6. 我们可以使用以下命令检查ClickHouse服务器的状态:

    Terminal window
    sudo service clickhouse-server status

在Manjaro / Arch Linux上安装ClickHouse

  1. 打开终端并使用以下命令安装ClickHouse:

    Terminal window
    sudo pacman -S clickhouse
  2. 启动ClickHouse服务:

    Terminal window
    sudo systemctl start clickhouse-server
  3. 我们可以使用以下命令检查ClickHouse服务器的状态:

    Terminal window
    sudo systemctl status clickhouse-server

这样ClickHouse就已经安装在你的Ubuntu或Arch Linux系统上了,并且服务已启动。

此时如果我们如果访问本地host上的这个网址:http://localhost:8123 ,会看到服务器返回了一个’Ok’给我们。

24.1.2 配置clickhouse的密码

还是不要忘记,生产环境中安全是至关重要的,在ClickHouse中配置密码需要完成以下步骤:

  1. 创建用户和设置密码: 首先,我们需要登录到ClickHouse服务器上,并使用管理员权限创建用户并设置密码。我们可以使用ClickHouse客户端或者通过在配置文件中执行SQL来完成这一步骤。

    使用ClickHouse客户端:

    CREATE USER 'your_username' IDENTIFIED BY 'your_password';

    请将 'your_username' 替换为我们要创建的用户名,将 'your_password' 替换为用户的密码。

  2. 分配权限: 创建用户后,需要分配相应的权限。通常,我们可以使用GRANT语句来为用户分配权限。以下是一个示例,将允许用户对特定表执行SELECT操作:

    GRANT SELECT ON database_name.table_name TO 'your_username';

    这将授予 'your_username' 用户对 'database_name.table_name' 表的SELECT权限。我们可以根据需要为用户分配不同的权限。

  3. 配置ClickHouse服务: 接下来,我们需要配置ClickHouse服务器以启用身份验证。在ClickHouse的配置文件中,找到并编辑users.xml文件。通常,该文件的位置是/etc/clickhouse-server/users.xml。在该文件中,我们可以为刚刚创建的用户添加相应的配置。

    <yandex>
    <profiles>
    <!-- 添加用户配置 -->
    <your_username>
    <password>your_password</password>
    <networks>
    <ip>::/0</ip> <!-- 允许所有IP连接 -->
    </networks>
    </your_username>
    </profiles>
    </yandex>

    请注意,这只是一个示例配置,我们需要将 'your_username''your_password' 替换为实际的用户名和密码。此外,上述配置允许来自所有IP地址的连接,这可能不是最安全的配置。我们可以根据需要限制连接的IP地址范围。

  4. 重启ClickHouse服务: 最后,重新启动ClickHouse服务器以使配置更改生效:

    Terminal window
    sudo systemctl restart clickhouse-server

    这会重新加载配置文件并应用新的用户和权限设置。

完成上述步骤后,我们的ClickHouse服务器将配置了用户名和密码的身份验证机制,并且只有具有正确凭据的用户才能访问相应的数据库和表。请确保密码强度足够,以增强安全性。

24.2 ClickHouse for Rust: clickhouse.rs库

clickhouse.rs 是一个网友 Paul Loyd 开发的比较成熟的第三方 Rust 库,旨在与 ClickHouse 数据库进行交互,提供了便捷的查询执行、数据处理和连接管理功能。以下是该库的一些主要特点:

主要特点

  1. 异步支持clickhouse.rs 利用了 Rust 的异步编程能力,非常适合需要非阻塞数据库操作的高性能应用程序。
  2. 类型化接口:该库提供了强类型接口,使数据库交互更加安全和可预测,减少运行时错误并提高代码的健壮性。
  3. 支持 ClickHouse 特性:库支持多种 ClickHouse 特性,包括批量插入、复杂查询和不同的数据类型。
  4. 连接池clickhouse.rs 包含连接池功能,在高负载场景下实现高效的数据库连接管理。
  5. 易用性:该库设计简洁明了的 API,使各级 Rust 开发者都能轻松上手。

安装

要使用 clickhouse.rs,需要在 Cargo.toml 中添加依赖项:

[dependencies]
clickhouse = "0.11" # 请确保使用最新版本

基本用法

以下是使用 clickhouse.rs 连接 ClickHouse 数据库并执行查询的简单示例:

use clickhouse::{Client, Row};
use futures::stream::StreamExt;
use tokio;
#[tokio::main]
async fn main() {
// 初始化客户端
let client = Client::default()
.with_url("http://localhost:8123")
.with_database("default");
// 执行查询示例
let mut cursor = client.query("SELECT number FROM system.numbers LIMIT 10").fetch().unwrap();
while let Some(row) = cursor.next().await {
let number: u64 = row.unwrap().get("number").unwrap();
println!("{}", number);
}
}

错误处理

该库使用标准的 Rust 错误处理机制,使得管理潜在问题变得简单。以下是处理查询执行错误的示例:

use clickhouse::{Client, Error};
use tokio;
#[tokio::main]
async fn main() -> Result<(), Error> {
let client = Client::default().with_url("http://localhost:8123");
let result = client.query("SELECT number FROM system.numbers LIMIT 10")
.fetch()
.await;
match result {
Ok(mut cursor) => {
while let Some(row) = cursor.next().await {
let number: u64 = row.unwrap().get("number").unwrap();
println!("{}", number);
}
}
Err(e) => eprintln!("查询执行错误: {:?}", e),
}
Ok(())
}

高级用法

对于批量插入或处理特定数据类型等复杂场景,请参阅库的文档和示例。该库支持多种 ClickHouse 特性,可以适应各种使用场景。

24.3 备份 ClickHouse 数据库的教程

备份 ClickHouse 数据库对于数据安全和业务连续性至关重要。通过定期备份,可以在数据丢失、硬件故障或人为错误时快速恢复,确保数据完整性和可用性。此外,备份有助于在系统升级或迁移过程中保护数据,避免意外损失。备份还支持增量备份和压缩,优化存储空间和备份速度,为企业提供灵活、高效的数据管理解决方案。通过良好的备份策略,可以大大降低数据丢失风险,保障业务稳定运行。

配置备份目的地

首先,在 /etc/clickhouse-server/config.d/backup_disk.xml 中添加备份目的地配置:

<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

备份整个数据库

执行以下命令将整个数据库备份到指定磁盘:

BACKUP DATABASE my_database TO Disk('backups', 'database_backup.zip');

恢复整个数据库

从备份文件中恢复整个数据库:

RESTORE DATABASE my_database FROM Disk('backups', 'database_backup.zip');

备份表

执行以下命令将表备份到指定磁盘:

BACKUP TABLE my_database.my_table TO Disk('backups', 'table_backup.zip');

恢复表

从备份文件中恢复表:

RESTORE TABLE my_database.my_table FROM Disk('backups', 'table_backup.zip');

增量备份

指定基础备份进行增量备份:

BACKUP DATABASE my_database TO Disk('backups', 'incremental_backup.zip') SETTINGS base_backup = Disk('backups', 'database_backup.zip');

使用密码保护备份

备份文件使用密码保护:

BACKUP DATABASE my_database TO Disk('backups', 'protected_backup.zip') SETTINGS password='yourpassword';

压缩设置

指定压缩方法和级别:

BACKUP DATABASE my_database TO Disk('backups', 'compressed_backup.zip') SETTINGS compression_method='lzma', compression_level=3;

恢复特定分区

从备份中恢复特定分区:

RESTORE TABLE my_database.my_table PARTITIONS 'partition_id' FROM Disk('backups', 'table_backup.zip');

更多详细信息请参考 ClickHouse 文档

24.4 关于Clickhouse的优化

在量化金融领域,处理大量实时数据至关重要。ClickHouse作为一款高性能列式数据库,提供了高效的查询和存储方案。然而,为了充分发挥其性能,必须对其进行优化。

硬件优化

  1. 存储设备:选择高性能 SSD,可以显著提高数据读取和写入速度。
  2. 内存:增加内存容量,有助于更快地处理大量数据。
  3. 网络:优化网络带宽和延迟,确保分布式集群间的数据传输效率。

配置优化

  1. 设置合适的分区策略:根据时间或其他关键维度分区,提高查询性能。
  2. 合并设置:配置合适的 merge_tree 设置,优化数据合并过程,减少碎片。
  3. 缓存和内存设置:调整 mark_cache_sizemax_memory_usage 等参数,提升缓存命中率和内存使用效率。

查询优化

  1. 索引:利用主键和二级索引,加速查询。
  2. 并行查询:启用 max_threads 参数,充分利用多核 CPU 并行处理查询。
  3. 物化视图:预计算常用查询结果,减少实时计算开销。

数据模型优化

  1. 列存储设计:尽量将频繁查询的列存储在一起,减少 I/O 开销。
  2. 压缩算法:选择合适的压缩算法,如 LZ4ZSTD,在压缩率和性能之间取得平衡。

实践案例

以量化金融中的市场数据分析为例,优化 ClickHouse 的关键步骤如下:

  1. 分区策略:按日期分区,使查询特定时间段数据时更高效。
  2. 物化视图:预计算每日交易量、价格波动等关键指标,减少实时计算负担。
  3. 并行查询:调整 max_threads,确保查询时充分利用服务器多核资源。

监控和维护

  1. 监控工具:使用 ClickHouse 提供的 system 表监控系统性能和查询效率。
  2. 定期维护:定期检查并优化分区和索引,防止性能下降。

通过合理的硬件配置、优化查询和数据模型设计,以及持续的监控和维护,ClickHouse 可以在量化金融领域中提供卓越的性能和可靠性,支持高频交易、实时数据分析等应用场景。以下我将介绍一些优化实例:

24.4.1. 硬件配置
<clickhouse>
<storage_configuration>
<disks>
<default>
<path>/var/lib/clickhouse/</path>
</default>
<ssd>
<type>local</type>
<path>/mnt/ssd/</path>
</ssd>
</disks>
</storage_configuration>
</clickhouse>
24.4.2. 创建分区表
CREATE TABLE market_data (
date Date,
symbol String,
price Float64,
volume UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, date);
24.4.3. 使用物化视图
CREATE MATERIALIZED VIEW market_summary
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, date)
AS
SELECT
symbol,
toYYYYMM(date) as month,
avg(price) as avg_price,
sum(volume) as total_volume
FROM market_data
GROUP BY symbol, month;
24.4.4. 并行查询
SET max_threads = 8;
SELECT
symbol,
avg(price) as avg_price
FROM market_data
WHERE date >= '2023-01-01' AND date <= '2023-12-31'
GROUP BY symbol;
24.4.5. 压缩算法
CREATE TABLE compressed_data (
date Date,
symbol String,
price Float64,
volume UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, date)
SETTINGS index_granularity = 8192,
compress_on_write = 1,
compression = 'lz4';

通过合理的硬件配置、优化查询和数据模型设计,以及持续的监控和维护,ClickHouse 可以在量化金融领域中提供卓越的性能和可靠性,支持高频交易、实时数据分析等应用场景。

案例1 通过Rust脚本在Clickhouse数据库中建表、删表、查询

在量化金融领域中,使用 Rust 脚本管理 ClickHouse 数据库可以实现高效的数据处理和管理。以下是一个基本案例 。

准备工作

首先,确保在你的 Cargo.toml 中添加 clickhouse 依赖:

[dependencies]
clickhouse = { default-features = false, version = "0.11.6" }

创建 ClickHouse 客户端

定义并初始化一个 ClickHouse 客户端:

use clickhouse::{Client, Row};
use lazy_static::lazy_static;
lazy_static! {
pub static ref CLICKHOUSE_CLIENT: ClickHouseClient = ClickHouseClient::new();
}
pub struct ClickHouseClient {
pub client: Client,
}
impl ClickHouseClient {
pub fn new() -> Self {
let client = Client::default().with_url("http://localhost:8123").with_database("default");
ClickHouseClient { client }
}
}

创建表

创建一个新的表 market_data

impl ClickHouseClient {
pub async fn create_table(&self) -> Result<(), clickhouse::error::Error> {
let query = r#"
CREATE TABLE market_data (
date Date,
symbol String,
price Float64,
volume UInt64
) ENGINE = MergeTree()
PARTITION BY date
ORDER BY (symbol, date)
"#;
self.client.query(query).execute().await
}
}

删除表

删除一个已存在的表:

impl ClickHouseClient {
pub async fn drop_table(&self, table_name: &str) -> Result<(), clickhouse::error::Error> {
let query = format!("DROP TABLE IF EXISTS {}", table_name);
self.client.query(&query).execute().await
}
}

查询数据

从表中查询数据:

#[derive(Debug, Serialize, Deserialize, Row)]
pub struct MarketData {
pub date: String,
pub symbol: String,
pub price: f64,
pub volume: u64,
}
impl ClickHouseClient {
pub async fn query_data(&self) -> Result<Vec<MarketData>, clickhouse::error::Error> {
let query = "SELECT * FROM market_data LIMIT 10";
let result = self.client.query(query).fetch_all::<MarketData>().await?;
Ok(result)
}
}

示例用法

完整的示例代码展示了如何使用这些功能:

#[tokio::main]
async fn main() -> Result<(), clickhouse::error::Error> {
let client = ClickHouseClient::new();
// 创建表
client.create_table().await?;
println!("Table created successfully.");
// 查询数据
let data = client.query_data().await?;
for row in data {
println!("{:?}", row);
}
// 删除表
client.drop_table("market_data").await?;
println!("Table dropped successfully.");
Ok(())
}

通过这个基本教程,你可以在 Rust 脚本中实现对 ClickHouse 数据库的基本管理操作。这些示例代码可以根据具体需求进行扩展和优化,以满足量化金融领域的复杂数据处理需求。

案例2 创建布林带表的 SQL 脚本示例

本案例展示如何利用 Rust 脚本与 ClickHouse 交互,计算布林带 (Bollinger Bands) 和其他技术指标,帮助金融分析师和量化交易员优化他们的交易策略。

-- 创建名为 AG2305_TEST 的表,使用 MergeTree 引擎
CREATE TABLE AG2305_TEST
ENGINE = MergeTree()
ORDER BY (minute, mean_lastprice) -- 按 minute 和 mean_lastprice 排序
AS
-- 从子查询中选择字段
SELECT outer_query.minute,
CAST(outer_query.mean_lastprice AS Float32) AS mean_lastprice, -- 将 mean_lastprice 转换为 Float32 类型
CAST(sma AS Float32) AS sma, -- 将 sma 转换为 Float32 类型
CAST(stddev AS Float32) AS stddev, -- 将 stddev 转换为 Float32 类型
CAST(upper AS Float32) AS upper, -- 将 upper 转换为 Float32 类型
CAST(lower AS Float32) AS lower, -- 将 lower 转换为 Float32 类型
CAST(super_upper AS Float32) AS super_upper, -- 将 super_upper 转换为 Float32 类型
CAST(super_lower AS Float32) AS super_lower, -- 将 super_lower 转换为 Float32 类型
CAST(ssuper_upper AS Float32) AS ssuper_upper, -- 将 ssuper_upper 转换为 Float32 类型
CAST(ssuper_lower AS Float32) AS ssuper_lower, -- 将 ssuper_lower 转换为 Float32 类型
CASE
-- 根据价格突破的不同情况赋值 bollinger_band_status
WHEN super_upper > mean_lastprice AND mean_lastprice > upper THEN 1 -- 向上突破2个标准差
WHEN super_lower < mean_lastprice AND mean_lastprice < lower THEN 2 -- 向下突破2个标准差
WHEN ssuper_upper > mean_lastprice AND mean_lastprice > super_upper THEN 3 -- 向上突破4个标准差
WHEN ssuper_lower < mean_lastprice AND mean_lastprice < super_lower THEN 4 -- 向下突破4个标准差
WHEN mean_lastprice > ssuper_upper THEN 5 -- 向上突破5个标准差
WHEN mean_lastprice < ssuper_lower AND upper != lower THEN 6 -- 向下突破5个标准差
ELSE 0 -- 在布林带内及其他情况
END AS bollinger_band_status
FROM (
-- 从内层查询中选择字段
SELECT subquery.minute,
subquery.mean_lastprice,
subquery.start_time,
-- 计算简单移动平均线 (SMA)
avg(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS sma,
-- 计算标准差
stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS stddev,
-- 计算布林带上下轨
sma + 1.5 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS upper,
sma - 1.5 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS lower,
-- 计算更高和更低的布林带轨道
sma + 3 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS super_upper,
sma - 3 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS super_lower,
sma + 4 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS ssuper_upper,
sma - 4 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS ssuper_lower
FROM (
-- 最内层查询,按分钟聚合数据并计算均价
SELECT toStartOfMinute(datetime) AS minute,
AVG(lastprice) AS mean_lastprice,
-- 计算开始时间,根据 datetime 决定是当前日期的 21 点还是前一天的 21 点
CASE
WHEN toHour(datetime) < 21 THEN toDate(datetime) - INTERVAL 1 DAY + INTERVAL 21 HOUR
ELSE toDate(datetime) + INTERVAL 21 HOUR
END AS start_time
FROM futures.AG2305
GROUP BY minute, start_time
) subquery
) outer_query
ORDER BY outer_query.minute;

脚本解析

  1. 创建表:使用 MergeTree 引擎创建表 AG2305_TEST,并按 minutemean_lastprice 排序。
  2. 选择字段:从内部查询中选择字段并进行类型转换。
  3. 布林带状态:根据价格与布林带上下轨的关系,确定 bollinger_band_status 的值。
  4. 内部查询
    • 子查询:聚合每分钟的平均价格 mean_lastprice,并计算开始时间 start_time
    • 布林带计算:计算简单移动平均线(SMA)和不同标准差倍数的上下轨。

通过这个逐行注释的 SQL 脚本,可以更好地理解如何在 ClickHouse 中创建复杂的计算表,并应用于量化金融领域的数据处理。