DuckDB 保姆级使用入门指南
一、DuckDB 简介
1.1 什么是 DuckDB?
DuckDB 是一个嵌入式分析型数据库管理系统(OLAP),专为数据分析而设计。它具有以下特点:
- 嵌入式:无需单独服务器,直接作为库集成到应用中
- 列式存储:优化分析查询性能
- SQL 兼容:支持标准 SQL 和丰富扩展
- 高性能:针对分析查询优化
- 零依赖:单个可执行文件/库
1.2 适用场景
- 数据探索与分析
- 本地数据处理
- ETL 流程
- 嵌入式分析应用
- 替代 pandas 处理大数据集
二、安装与配置
2.1 Python 安装
# 使用 pip 安装
pip install duckdb
# 安装包含扩展的版本
pip install "duckdb[postgres,httpfs,parquet]"
2.2 其他语言安装
# R 语言
install.packages("duckdb")
# Node.js
npm install duckdb
# CLI 工具
# 从官网下载对应平台的可执行文件
三、基础使用
3.1 连接数据库
import duckdb
# 1. 内存数据库(默认)
conn = duckdb.connect()
# 2. 持久化数据库
conn = duckdb.connect('my_database.db')
# 3. 只读模式
conn = duckdb.connect('my_database.db', read_only=True)
# 4. 配置参数
conn = duckdb.connect(config={
'memory_limit': '1GB',
'threads': 4
})
3.2 基本操作
import duckdb
import pandas as pd
# 创建连接
conn = duckdb.connect()
# 创建表
conn.execute("""
CREATE TABLE users (
id INTEGER,
name VARCHAR,
age INTEGER,
city VARCHAR
)
""")
# 插入数据
conn.execute("""
INSERT INTO users VALUES
(1, 'Alice', 25, 'New York'),
(2, 'Bob', 30, 'London'),
(3, 'Charlie', 35, 'Tokyo')
""")
# 查询数据
result = conn.execute("SELECT * FROM users").fetchall()
print(result)
# 使用 fetchdf() 获取 pandas DataFrame
df = conn.execute("SELECT * FROM users").fetchdf()
print(df)
# 关闭连接
conn.close()
四、数据导入与导出
4.1 从各种源导入数据
import duckdb
import pandas as pd
conn = duckdb.connect()
# 1. 从 CSV 导入
conn.execute("""
CREATE TABLE sales AS
SELECT * FROM read_csv('sales.csv', auto_detect=true)
""")
# 2. 从 Parquet 导入
conn.execute("""
CREATE TABLE logs AS
SELECT * FROM read_parquet('logs/*.parquet')
""")
# 3. 从 JSON 导入
conn.execute("""
CREATE TABLE events AS
SELECT * FROM read_json('events.json', format='auto')
""")
# 4. 从 pandas DataFrame 导入
df = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=100),
'value': range(100)
})
conn.execute("CREATE TABLE df_table AS SELECT * FROM df")
# 5. 从远程数据源(需安装 httpfs 扩展)
conn.execute("INSTALL httpfs")
conn.execute("LOAD httpfs")
conn.execute("""
CREATE TABLE remote_data AS
SELECT * FROM read_parquet('s3://bucket/data.parquet')
""")
4.2 数据导出
# 1. 导出到 CSV
conn.execute("""
COPY (SELECT * FROM users)
TO 'users_export.csv' (HEADER, DELIMITER ',')
""")
# 2. 导出到 Parquet
conn.execute("""
COPY (SELECT * FROM users)
TO 'users_export.parquet' (FORMAT PARQUET)
""")
# 3. 导出到 pandas
df = conn.execute("SELECT * FROM users").fetchdf()
# 4. 导出到其他数据库(需安装对应扩展)
conn.execute("INSTALL postgres")
conn.execute("LOAD postgres")
conn.execute("""
ATTACH 'dbname=postgres user=postgres' AS pg_db
""")
conn.execute("""
CREATE TABLE pg_db.users AS
SELECT * FROM users
""")
五、SQL 查询与分析
5.1 基本查询
# 条件查询
result = conn.execute("""
SELECT name, age
FROM users
WHERE age > 25 AND city = 'London'
""").fetchdf()
# 聚合查询
result = conn.execute("""
SELECT
city,
COUNT(*) as user_count,
AVG(age) as avg_age,
MAX(age) as max_age
FROM users
GROUP BY city
HAVING COUNT(*) > 1
ORDER BY avg_age DESC
""").fetchdf()
# 窗口函数
result = conn.execute("""
SELECT
id,
name,
age,
RANK() OVER (ORDER BY age DESC) as age_rank,
AVG(age) OVER (PARTITION BY city) as city_avg_age
FROM users
""").fetchdf()
5.2 复杂查询示例
# CTE(公用表表达式)
result = conn.execute("""
WITH user_stats AS (
SELECT
city,
COUNT(*) as count,
AVG(age) as avg_age
FROM users
GROUP BY city
),
active_users AS (
SELECT DISTINCT user_id
FROM activities
WHERE activity_date > CURRENT_DATE - INTERVAL '30 days'
)
SELECT
u.*,
us.avg_age as city_avg_age
FROM users u
LEFT JOIN user_stats us ON u.city = us.city
WHERE u.id IN (SELECT user_id FROM active_users)
""").fetchdf()
# PIVOT 操作(需要 PIVOT 扩展)
conn.execute("INSTALL pivot")
conn.execute("LOAD pivot")
result = conn.execute("""
PIVOT users
ON city
USING COUNT(*)
""").fetchdf()
六、性能优化技巧
6.1 索引使用
# 创建索引加速查询
conn.execute("CREATE INDEX idx_users_city ON users(city)")
conn.execute("CREATE INDEX idx_users_age ON users(age)")
# 查看查询计划
plan = conn.execute("EXPLAIN SELECT * FROM users WHERE age > 30").fetchall()
for line in plan:
print(line[0])
6.2 分区与排序
# 创建按日期分区的表
conn.execute("""
CREATE TABLE sales_partitioned (
sale_date DATE,
product_id INTEGER,
amount DECIMAL(10,2)
)
PARTITION BY RANGE(sale_date)
""")
# 按日期范围插入数据
conn.execute("""
INSERT INTO sales_partitioned
VALUES
('2023-01-01', 1, 100.00),
('2023-01-02', 2, 200.00)
""")
6.3 配置优化
# 设置内存限制和线程数
conn.execute("SET memory_limit='4GB'")
conn.execute("SET threads=8")
# 启用并行处理
conn.execute("SET enable_progress_bar=true")
# 设置缓存大小
conn.execute("SET temp_directory='/tmp/duckdb_cache'")
七、高级功能
7.1 用户定义函数(UDF)
# 标量函数
conn.create_function('add_tax', lambda price: price * 1.1)
result = conn.execute("""
SELECT price, add_tax(price) as price_with_tax
FROM products
""").fetchdf()
# 聚合函数
class StringAgg:
def __init__(self):
self.values = []
def step(self, value):
if value:
self.values.append(str(value))
def finalize(self):
return ', '.join(self.values)
conn.create_aggregate('string_agg', StringAgg)
7.2 时间序列处理
# 时间序列扩展
conn.execute("INSTALL time")
conn.execute("LOAD time")
# 时间序列函数
result = conn.execute("""
SELECT
time_bucket(INTERVAL '1 day', timestamp) as day,
COUNT(*) as events,
SUM(value) as total_value
FROM events
WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '7 days'
GROUP BY day
ORDER BY day
""").fetchdf()
7.3 空间数据处理
# 空间扩展
conn.execute("INSTALL spatial")
conn.execute("LOAD spatial")
# 空间查询
result = conn.execute("""
SELECT
name,
ST_Area(geometry) as area,
ST_Distance(geometry, ST_Point(0, 0)) as distance
FROM locations
WHERE ST_Within(geometry, ST_GeomFromText('POLYGON(...)'))
""").fetchdf()
八、与 Python 生态集成
8.1 替代 pandas 处理大数据
import duckdb
import pandas as pd
# 大文件处理 - DuckDB 更高效
# 传统 pandas 方式(内存不足)
# df = pd.read_csv('large_file.csv') # 可能内存不足
# DuckDB 方式
conn = duckdb.connect()
# 直接查询 CSV,无需完全加载到内存
result = conn.execute("""
SELECT
column1,
AVG(column2)
FROM read_csv('large_file.csv')
GROUP BY column1
""").fetchdf()
# 流式处理超大文件
for chunk in conn.execute("""
SELECT * FROM read_csv('huge_file.csv')
""").fetch_arrow_chunk_iterator():
# 处理每个 chunk
process_chunk(chunk.to_pandas())
8.2 与机器学习库集成
import duckdb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
# 直接从数据库加载数据
conn = duckdb.connect('data.db')
# 准备特征数据
features = conn.execute("""
SELECT
feature1,
feature2,
feature3,
target
FROM training_data
WHERE split = 'train'
""").fetchdf()
# 分割数据集
X = features[['feature1', 'feature2', 'feature3']]
y = features['target']
X_train, X_test, y_train, y_test = train_test_split(X, y)
# 训练模型
model = RandomForestRegressor()
model.fit(X_train, y_train)
# 将预测结果存回数据库
predictions = pd.DataFrame({
'id': X_test.index,
'prediction': model.predict(X_test)
})
conn.execute("CREATE TABLE predictions AS SELECT * FROM predictions")
8.3 在 Jupyter 中使用
# 在 Jupyter Notebook 中,可以使用 magic 命令
# %load_ext duckdb
# %%sql
# SELECT * FROM users LIMIT 10;
# 或者使用 duckdb 的显示优化
conn.execute("PRAGMA enable_progress_bar")
九、实战案例
9.1 电商数据分析
import duckdb
import matplotlib.pyplot as plt
# 创建连接
conn = duckdb.connect('ecommerce.db')
# 1. 加载数据
conn.execute("""
CREATE TABLE orders AS
SELECT * FROM read_parquet('orders/*.parquet')
""")
conn.execute("""
CREATE TABLE products AS
SELECT * FROM read_csv('products.csv')
""")
conn.execute("""
CREATE TABLE customers AS
SELECT * FROM read_json('customers.json')
""")
# 2. 销售分析
sales_report = conn.execute("""
SELECT
DATE_TRUNC('month', order_date) as month,
p.category,
COUNT(DISTINCT o.order_id) as order_count,
SUM(o.quantity * p.price) as revenue,
COUNT(DISTINCT o.customer_id) as unique_customers
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE order_date >= '2023-01-01'
GROUP BY month, p.category
ORDER BY month, revenue DESC
""").fetchdf()
# 3. 客户分析
customer_analysis = conn.execute("""
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(DISTINCT order_id) as order_count,
SUM(quantity * price) as total_spent,
MIN(order_date) as first_order,
MAX(order_date) as last_order
FROM orders o
JOIN products p ON o.product_id = p.id
GROUP BY customer_id
)
SELECT
CASE
WHEN total_spent > 1000 THEN 'VIP'
WHEN total_spent > 500 THEN 'Regular'
ELSE 'Casual'
END as customer_segment,
COUNT(*) as customer_count,
AVG(order_count) as avg_orders,
AVG(total_spent) as avg_spent
FROM customer_metrics
GROUP BY customer_segment
ORDER BY avg_spent DESC
""").fetchdf()
# 4. 保存分析结果
conn.execute("""
CREATE TABLE sales_report AS
SELECT * FROM sales_report
""")
conn.execute("""
COPY customer_analysis TO 'customer_segments.csv' (HEADER)
""")
9.2 日志分析
import duckdb
from datetime import datetime, timedelta
conn = duckdb.connect('logs.db')
# 分析日志数据
conn.execute("""
CREATE TABLE logs AS
SELECT * FROM read_parquet('server_logs/*.parquet')
""")
# 错误分析
errors = conn.execute("""
WITH error_logs AS (
SELECT
DATE_TRUNC('hour', timestamp) as hour,
server_id,
COUNT(*) as error_count,
COUNT(DISTINCT user_id) as affected_users
FROM logs
WHERE level = 'ERROR'
AND timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY hour, server_id
)
SELECT
hour,
SUM(error_count) as total_errors,
AVG(error_count) as avg_errors_per_server,
MAX(error_count) as max_errors_on_server
FROM error_logs
GROUP BY hour
ORDER BY hour
""").fetchdf()
# 性能监控
performance = conn.execute("""
SELECT
endpoint,
method,
COUNT(*) as request_count,
AVG(response_time_ms) as avg_response_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) as p95_response_time,
SUM(CASE WHEN response_time_ms > 1000 THEN 1 ELSE 0 END) as slow_requests
FROM logs
WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY endpoint, method
HAVING COUNT(*) > 10
ORDER BY avg_response_time DESC
""").fetchdf()
十、最佳实践
10.1 性能最佳实践
# 1. 批量插入数据
conn.execute("BEGIN TRANSACTION")
for i in range(1000):
conn.execute(f"INSERT INTO table VALUES ({i})")
conn.execute("COMMIT")
# 2. 使用 prepared statements 重复查询
prep = conn.prepare("SELECT * FROM users WHERE age > ? AND city = ?")
result1 = prep.execute((25, 'London')).fetchdf()
result2 = prep.execute((30, 'New York')).fetchdf()
# 3. 合理使用索引
# 只为经常过滤的列创建索引
# 4. 分区大表
# 按时间或类别分区提高查询性能
# 5. 定期清理临时文件
conn.execute("PRAGMA temp_directory='/tmp/duckdb'")
10.2 内存管理
# 监控内存使用
memory_info = conn.execute("PRAGMA memory_usage").fetchone()[0]
print(f"Memory usage: {memory_info}")
# 设置合理的内存限制
conn.execute("SET memory_limit='2GB'")
# 使用磁盘溢出
conn.execute("SET temp_directory='/tmp/duckdb_temp'")
# 清理缓存
conn.execute("PRAGMA clear_cache")
10.3 错误处理
import duckdb
from contextlib import closing
try:
with closing(duckdb.connect('database.db')) as conn:
# 自动提交模式
conn.execute("BEGIN TRANSACTION")
# 执行多个操作
conn.execute("INSERT INTO table1 VALUES (1)")
conn.execute("INSERT INTO table2 VALUES (2)")
# 提交事务
conn.execute("COMMIT")
except duckdb.Error as e:
print(f"DuckDB error: {e}")
# 回滚事务
conn.execute("ROLLBACK")
except Exception as e:
print(f"General error: {e}")
十一、常见问题解答
Q1: DuckDB vs SQLite 有什么区别?
DuckDB: 列式存储,OLAP 优化,分析查询快
SQLite: 行式存储,OLTP 优化,事务处理快
Q2: 如何处理内存不足?
# 1. 增加内存限制
conn.execute("SET memory_limit='4GB'")
# 2. 启用磁盘溢出
conn.execute("SET temp_directory='/tmp/duckdb'")
# 3. 使用流式处理
for chunk in conn.execute("QUERY").fetch_arrow_chunk_iterator():
process_chunk(chunk)
Q3: 如何备份数据库?
# 方法1: 复制数据库文件
import shutil
shutil.copy2('database.db', 'database_backup.db')
# 方法2: 使用 EXPORT 命令
conn.execute("EXPORT DATABASE 'backup_directory'")
# 方法3: 使用 COPY 命令备份表
conn.execute("COPY table TO 'table_backup.parquet' (FORMAT PARQUET)")
总结
DuckDB 是一个功能强大的嵌入式分析数据库,特别适合:
数据科学家:替代 pandas 处理大数据
开发者:在应用中嵌入分析功能
分析师:快速数据探索和原型开发
通过本指南,你应该能够:
- 安装和配置 DuckDB
- 导入导出各种格式数据
- 执行复杂的 SQL 分析查询
- 优化查询性能
- 集成到 Python 数据科学工作流
继续深入学习:
- 官方文档:https://duckdb.org/docs/
- GitHub 仓库:https://github.com/duckdb/duckdb
- 社区支持:https://duckdb.org/community/
开始你的 DuckDB 数据分析之旅吧!