杭州华金科技

Python数据分析神器DuckDB保姆级使用入门指南

2026-04-04 13:17:01 浏览次数:0
详细信息

DuckDB 保姆级使用入门指南

一、DuckDB 简介

1.1 什么是 DuckDB?

DuckDB 是一个嵌入式分析型数据库管理系统(OLAP),专为数据分析而设计。它具有以下特点:

1.2 适用场景

二、安装与配置

2.1 Python 安装

# 使用 pip 安装
pip install duckdb

# 安装包含扩展的版本
pip install "duckdb[postgres,httpfs,parquet]"

2.2 其他语言安装

# R 语言
install.packages("duckdb")

# Node.js
npm install duckdb

# CLI 工具
# 从官网下载对应平台的可执行文件

三、基础使用

3.1 连接数据库

import duckdb

# 1. 内存数据库(默认)
conn = duckdb.connect()

# 2. 持久化数据库
conn = duckdb.connect('my_database.db')

# 3. 只读模式
conn = duckdb.connect('my_database.db', read_only=True)

# 4. 配置参数
conn = duckdb.connect(config={
    'memory_limit': '1GB',
    'threads': 4
})

3.2 基本操作

import duckdb
import pandas as pd

# 创建连接
conn = duckdb.connect()

# 创建表
conn.execute("""
    CREATE TABLE users (
        id INTEGER,
        name VARCHAR,
        age INTEGER,
        city VARCHAR
    )
""")

# 插入数据
conn.execute("""
    INSERT INTO users VALUES 
    (1, 'Alice', 25, 'New York'),
    (2, 'Bob', 30, 'London'),
    (3, 'Charlie', 35, 'Tokyo')
""")

# 查询数据
result = conn.execute("SELECT * FROM users").fetchall()
print(result)

# 使用 fetchdf() 获取 pandas DataFrame
df = conn.execute("SELECT * FROM users").fetchdf()
print(df)

# 关闭连接
conn.close()

四、数据导入与导出

4.1 从各种源导入数据

import duckdb
import pandas as pd

conn = duckdb.connect()

# 1. 从 CSV 导入
conn.execute("""
    CREATE TABLE sales AS 
    SELECT * FROM read_csv('sales.csv', auto_detect=true)
""")

# 2. 从 Parquet 导入
conn.execute("""
    CREATE TABLE logs AS 
    SELECT * FROM read_parquet('logs/*.parquet')
""")

# 3. 从 JSON 导入
conn.execute("""
    CREATE TABLE events AS 
    SELECT * FROM read_json('events.json', format='auto')
""")

# 4. 从 pandas DataFrame 导入
df = pd.DataFrame({
    'date': pd.date_range('2023-01-01', periods=100),
    'value': range(100)
})
conn.execute("CREATE TABLE df_table AS SELECT * FROM df")

# 5. 从远程数据源(需安装 httpfs 扩展)
conn.execute("INSTALL httpfs")
conn.execute("LOAD httpfs")
conn.execute("""
    CREATE TABLE remote_data AS 
    SELECT * FROM read_parquet('s3://bucket/data.parquet')
""")

4.2 数据导出

# 1. 导出到 CSV
conn.execute("""
    COPY (SELECT * FROM users) 
    TO 'users_export.csv' (HEADER, DELIMITER ',')
""")

# 2. 导出到 Parquet
conn.execute("""
    COPY (SELECT * FROM users) 
    TO 'users_export.parquet' (FORMAT PARQUET)
""")

# 3. 导出到 pandas
df = conn.execute("SELECT * FROM users").fetchdf()

# 4. 导出到其他数据库(需安装对应扩展)
conn.execute("INSTALL postgres")
conn.execute("LOAD postgres")
conn.execute("""
    ATTACH 'dbname=postgres user=postgres' AS pg_db
""")
conn.execute("""
    CREATE TABLE pg_db.users AS 
    SELECT * FROM users
""")

五、SQL 查询与分析

5.1 基本查询

# 条件查询
result = conn.execute("""
    SELECT name, age 
    FROM users 
    WHERE age > 25 AND city = 'London'
""").fetchdf()

# 聚合查询
result = conn.execute("""
    SELECT 
        city,
        COUNT(*) as user_count,
        AVG(age) as avg_age,
        MAX(age) as max_age
    FROM users
    GROUP BY city
    HAVING COUNT(*) > 1
    ORDER BY avg_age DESC
""").fetchdf()

# 窗口函数
result = conn.execute("""
    SELECT 
        id,
        name,
        age,
        RANK() OVER (ORDER BY age DESC) as age_rank,
        AVG(age) OVER (PARTITION BY city) as city_avg_age
    FROM users
""").fetchdf()

5.2 复杂查询示例

# CTE(公用表表达式)
result = conn.execute("""
    WITH user_stats AS (
        SELECT 
            city,
            COUNT(*) as count,
            AVG(age) as avg_age
        FROM users
        GROUP BY city
    ),
    active_users AS (
        SELECT DISTINCT user_id
        FROM activities
        WHERE activity_date > CURRENT_DATE - INTERVAL '30 days'
    )
    SELECT 
        u.*,
        us.avg_age as city_avg_age
    FROM users u
    LEFT JOIN user_stats us ON u.city = us.city
    WHERE u.id IN (SELECT user_id FROM active_users)
""").fetchdf()

# PIVOT 操作(需要 PIVOT 扩展)
conn.execute("INSTALL pivot")
conn.execute("LOAD pivot")
result = conn.execute("""
    PIVOT users 
    ON city 
    USING COUNT(*)
""").fetchdf()

六、性能优化技巧

6.1 索引使用

# 创建索引加速查询
conn.execute("CREATE INDEX idx_users_city ON users(city)")
conn.execute("CREATE INDEX idx_users_age ON users(age)")

# 查看查询计划
plan = conn.execute("EXPLAIN SELECT * FROM users WHERE age > 30").fetchall()
for line in plan:
    print(line[0])

6.2 分区与排序

# 创建按日期分区的表
conn.execute("""
    CREATE TABLE sales_partitioned (
        sale_date DATE,
        product_id INTEGER,
        amount DECIMAL(10,2)
    ) 
    PARTITION BY RANGE(sale_date)
""")

# 按日期范围插入数据
conn.execute("""
    INSERT INTO sales_partitioned 
    VALUES 
        ('2023-01-01', 1, 100.00),
        ('2023-01-02', 2, 200.00)
""")

6.3 配置优化

# 设置内存限制和线程数
conn.execute("SET memory_limit='4GB'")
conn.execute("SET threads=8")

# 启用并行处理
conn.execute("SET enable_progress_bar=true")

# 设置缓存大小
conn.execute("SET temp_directory='/tmp/duckdb_cache'")

七、高级功能

7.1 用户定义函数(UDF)

# 标量函数
conn.create_function('add_tax', lambda price: price * 1.1)

result = conn.execute("""
    SELECT price, add_tax(price) as price_with_tax 
    FROM products
""").fetchdf()

# 聚合函数
class StringAgg:
    def __init__(self):
        self.values = []

    def step(self, value):
        if value:
            self.values.append(str(value))

    def finalize(self):
        return ', '.join(self.values)

conn.create_aggregate('string_agg', StringAgg)

7.2 时间序列处理

# 时间序列扩展
conn.execute("INSTALL time")
conn.execute("LOAD time")

# 时间序列函数
result = conn.execute("""
    SELECT
        time_bucket(INTERVAL '1 day', timestamp) as day,
        COUNT(*) as events,
        SUM(value) as total_value
    FROM events
    WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '7 days'
    GROUP BY day
    ORDER BY day
""").fetchdf()

7.3 空间数据处理

# 空间扩展
conn.execute("INSTALL spatial")
conn.execute("LOAD spatial")

# 空间查询
result = conn.execute("""
    SELECT 
        name,
        ST_Area(geometry) as area,
        ST_Distance(geometry, ST_Point(0, 0)) as distance
    FROM locations
    WHERE ST_Within(geometry, ST_GeomFromText('POLYGON(...)'))
""").fetchdf()

八、与 Python 生态集成

8.1 替代 pandas 处理大数据

import duckdb
import pandas as pd

# 大文件处理 - DuckDB 更高效
# 传统 pandas 方式(内存不足)
# df = pd.read_csv('large_file.csv')  # 可能内存不足

# DuckDB 方式
conn = duckdb.connect()

# 直接查询 CSV,无需完全加载到内存
result = conn.execute("""
    SELECT 
        column1, 
        AVG(column2) 
    FROM read_csv('large_file.csv') 
    GROUP BY column1
""").fetchdf()

# 流式处理超大文件
for chunk in conn.execute("""
    SELECT * FROM read_csv('huge_file.csv')
""").fetch_arrow_chunk_iterator():
    # 处理每个 chunk
    process_chunk(chunk.to_pandas())

8.2 与机器学习库集成

import duckdb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor

# 直接从数据库加载数据
conn = duckdb.connect('data.db')

# 准备特征数据
features = conn.execute("""
    SELECT 
        feature1,
        feature2,
        feature3,
        target
    FROM training_data
    WHERE split = 'train'
""").fetchdf()

# 分割数据集
X = features[['feature1', 'feature2', 'feature3']]
y = features['target']
X_train, X_test, y_train, y_test = train_test_split(X, y)

# 训练模型
model = RandomForestRegressor()
model.fit(X_train, y_train)

# 将预测结果存回数据库
predictions = pd.DataFrame({
    'id': X_test.index,
    'prediction': model.predict(X_test)
})
conn.execute("CREATE TABLE predictions AS SELECT * FROM predictions")

8.3 在 Jupyter 中使用

# 在 Jupyter Notebook 中,可以使用 magic 命令
# %load_ext duckdb
# %%sql
# SELECT * FROM users LIMIT 10;

# 或者使用 duckdb 的显示优化
conn.execute("PRAGMA enable_progress_bar")

九、实战案例

9.1 电商数据分析

import duckdb
import matplotlib.pyplot as plt

# 创建连接
conn = duckdb.connect('ecommerce.db')

# 1. 加载数据
conn.execute("""
    CREATE TABLE orders AS 
    SELECT * FROM read_parquet('orders/*.parquet')
""")

conn.execute("""
    CREATE TABLE products AS 
    SELECT * FROM read_csv('products.csv')
""")

conn.execute("""
    CREATE TABLE customers AS 
    SELECT * FROM read_json('customers.json')
""")

# 2. 销售分析
sales_report = conn.execute("""
    SELECT 
        DATE_TRUNC('month', order_date) as month,
        p.category,
        COUNT(DISTINCT o.order_id) as order_count,
        SUM(o.quantity * p.price) as revenue,
        COUNT(DISTINCT o.customer_id) as unique_customers
    FROM orders o
    JOIN products p ON o.product_id = p.id
    WHERE order_date >= '2023-01-01'
    GROUP BY month, p.category
    ORDER BY month, revenue DESC
""").fetchdf()

# 3. 客户分析
customer_analysis = conn.execute("""
    WITH customer_metrics AS (
        SELECT 
            customer_id,
            COUNT(DISTINCT order_id) as order_count,
            SUM(quantity * price) as total_spent,
            MIN(order_date) as first_order,
            MAX(order_date) as last_order
        FROM orders o
        JOIN products p ON o.product_id = p.id
        GROUP BY customer_id
    )
    SELECT 
        CASE 
            WHEN total_spent > 1000 THEN 'VIP'
            WHEN total_spent > 500 THEN 'Regular'
            ELSE 'Casual'
        END as customer_segment,
        COUNT(*) as customer_count,
        AVG(order_count) as avg_orders,
        AVG(total_spent) as avg_spent
    FROM customer_metrics
    GROUP BY customer_segment
    ORDER BY avg_spent DESC
""").fetchdf()

# 4. 保存分析结果
conn.execute("""
    CREATE TABLE sales_report AS 
    SELECT * FROM sales_report
""")

conn.execute("""
    COPY customer_analysis TO 'customer_segments.csv' (HEADER)
""")

9.2 日志分析

import duckdb
from datetime import datetime, timedelta

conn = duckdb.connect('logs.db')

# 分析日志数据
conn.execute("""
    CREATE TABLE logs AS 
    SELECT * FROM read_parquet('server_logs/*.parquet')
""")

# 错误分析
errors = conn.execute("""
    WITH error_logs AS (
        SELECT 
            DATE_TRUNC('hour', timestamp) as hour,
            server_id,
            COUNT(*) as error_count,
            COUNT(DISTINCT user_id) as affected_users
        FROM logs
        WHERE level = 'ERROR'
            AND timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
        GROUP BY hour, server_id
    )
    SELECT 
        hour,
        SUM(error_count) as total_errors,
        AVG(error_count) as avg_errors_per_server,
        MAX(error_count) as max_errors_on_server
    FROM error_logs
    GROUP BY hour
    ORDER BY hour
""").fetchdf()

# 性能监控
performance = conn.execute("""
    SELECT 
        endpoint,
        method,
        COUNT(*) as request_count,
        AVG(response_time_ms) as avg_response_time,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) as p95_response_time,
        SUM(CASE WHEN response_time_ms > 1000 THEN 1 ELSE 0 END) as slow_requests
    FROM logs
    WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'
    GROUP BY endpoint, method
    HAVING COUNT(*) > 10
    ORDER BY avg_response_time DESC
""").fetchdf()

十、最佳实践

10.1 性能最佳实践

# 1. 批量插入数据
conn.execute("BEGIN TRANSACTION")
for i in range(1000):
    conn.execute(f"INSERT INTO table VALUES ({i})")
conn.execute("COMMIT")

# 2. 使用 prepared statements 重复查询
prep = conn.prepare("SELECT * FROM users WHERE age > ? AND city = ?")
result1 = prep.execute((25, 'London')).fetchdf()
result2 = prep.execute((30, 'New York')).fetchdf()

# 3. 合理使用索引
# 只为经常过滤的列创建索引

# 4. 分区大表
# 按时间或类别分区提高查询性能

# 5. 定期清理临时文件
conn.execute("PRAGMA temp_directory='/tmp/duckdb'")

10.2 内存管理

# 监控内存使用
memory_info = conn.execute("PRAGMA memory_usage").fetchone()[0]
print(f"Memory usage: {memory_info}")

# 设置合理的内存限制
conn.execute("SET memory_limit='2GB'")

# 使用磁盘溢出
conn.execute("SET temp_directory='/tmp/duckdb_temp'")

# 清理缓存
conn.execute("PRAGMA clear_cache")

10.3 错误处理

import duckdb
from contextlib import closing

try:
    with closing(duckdb.connect('database.db')) as conn:
        # 自动提交模式
        conn.execute("BEGIN TRANSACTION")

        # 执行多个操作
        conn.execute("INSERT INTO table1 VALUES (1)")
        conn.execute("INSERT INTO table2 VALUES (2)")

        # 提交事务
        conn.execute("COMMIT")

except duckdb.Error as e:
    print(f"DuckDB error: {e}")
    # 回滚事务
    conn.execute("ROLLBACK")

except Exception as e:
    print(f"General error: {e}")

十一、常见问题解答

Q1: DuckDB vs SQLite 有什么区别?

DuckDB: 列式存储,OLAP 优化,分析查询快
SQLite: 行式存储,OLTP 优化,事务处理快

Q2: 如何处理内存不足?

# 1. 增加内存限制
conn.execute("SET memory_limit='4GB'")

# 2. 启用磁盘溢出
conn.execute("SET temp_directory='/tmp/duckdb'")

# 3. 使用流式处理
for chunk in conn.execute("QUERY").fetch_arrow_chunk_iterator():
    process_chunk(chunk)

Q3: 如何备份数据库?

# 方法1: 复制数据库文件
import shutil
shutil.copy2('database.db', 'database_backup.db')

# 方法2: 使用 EXPORT 命令
conn.execute("EXPORT DATABASE 'backup_directory'")

# 方法3: 使用 COPY 命令备份表
conn.execute("COPY table TO 'table_backup.parquet' (FORMAT PARQUET)")

总结

DuckDB 是一个功能强大的嵌入式分析数据库,特别适合:

数据科学家:替代 pandas 处理大数据 开发者:在应用中嵌入分析功能 分析师:快速数据探索和原型开发

通过本指南,你应该能够:

继续深入学习:

开始你的 DuckDB 数据分析之旅吧!

相关推荐