LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

Python3 利用 PyMySQL 驱动操作数据库全面学习教程

admin
2025年12月10日 0:45 本文热度 10

大家好,今天我们来聊聊Python操作MySQL数据库这个话题。别看这玩意儿简单,但新手朋友经常在这上面摔跟头。

今天我就手把手教你,从零开始学习使用PyMySQL操作MySQL数据库,让你一次掌握,从此告别"Python连接数据库总出错"的尴尬!

为什么选择PyMySQL?

在开始学习之前,先聊聊为什么我们要选择PyMySQL:

  1. 纯Python实现:完全用Python编写,不需要额外的C库依赖
  2. 简单易用:API设计直观,学习成本低
  3. 兼容性好:支持Python 3.6+和MySQL 5.6+
  4. 社区活跃:GitHub上star众多,文档完善
  5. 功能完整:支持所有MySQL特性,包括SSL连接

安装PyMySQL

环境要求

在安装之前,确保你已经:

  • 安装了Python 3.6或更高版本
  • 安装了MySQL服务器并正常运行

安装步骤

# 使用pip安装PyMySQL
pip install PyMySQL

# 验证安装是否成功
python -c "import pymysql; print(pymysql.__version__)"

如果看到版本号输出,说明安装成功。

建立数据库连接

基本连接方式

import pymysql

# 建立数据库连接
connection = pymysql.connect(
    host='localhost',        # 数据库主机地址
    port=3306,              # 数据库端口
    user='root',            # 用户名
    password='your_password'# 密码
    database='test_db',     # 数据库名
    charset='utf8mb4'       # 字符集
)

# 关闭连接
connection.close()

连接参数详解

import pymysql

# 完整的连接参数配置
connection = pymysql.connect(
    host='localhost',           # 主机地址
    port=3306,                 # 端口号
    user='your_username',      # 用户名
    password='your_password',  # 密码
    database='your_database',  # 数据库名
    charset='utf8mb4',         # 字符集
    autocommit=False,          # 是否自动提交
    connect_timeout=10,        # 连接超时时间
    read_timeout=10,           # 读取超时时间
    write_timeout=10,          # 写入超时时间
    cursorclass=pymysql.cursors.DictCursor  # 游标类型
)

使用上下文管理器

推荐使用with语句管理连接,确保连接正确关闭:

import pymysql

# 使用with语句自动管理连接
with pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='test_db',
    charset='utf8mb4'
as connection:
    # 在这里执行数据库操作
    pass
# 连接会自动关闭

创建游标对象

游标对象用于执行SQL语句和获取结果:

import pymysql

# 建立连接
connection = pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='test_db',
    charset='utf8mb4'
)

# 创建游标对象
cursor = connection.cursor()

# 执行SQL语句
cursor.execute("SELECT VERSION()")

# 获取结果
result = cursor.fetchone()
print(f"MySQL版本: {result[0]}")

# 关闭游标和连接
cursor.close()
connection.close()

不同类型的游标

import pymysql.cursors

# 默认游标 - 返回元组
with pymysql.connect(...) as connection:
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM users")
    result = cursor.fetchall()  # ((1, 'Alice', 25), (2, 'Bob', 30))

# 字典游标 - 返回字典
with pymysql.connect(
    cursorclass=pymysql.cursors.DictCursor
as connection:
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM users")
    result = cursor.fetchall()  # [{'id': 1, 'name': 'Alice', 'age': 25}, ...]

# 字典游标的优势是可以通过键名访问字段
print(result[0]['name'])  # 输出: Alice

数据库基本操作

创建数据库和表

import pymysql

# 连接MySQL服务器(不指定数据库)
connection = pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    charset='utf8mb4'
)

try:
    with connection.cursor() as cursor:
        # 创建数据库
        cursor.execute("CREATE DATABASE IF NOT EXISTS myapp")
        
        # 选择数据库
        cursor.execute("USE myapp")
        
        # 创建用户表
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            username VARCHAR(50) NOT NULL UNIQUE,
            email VARCHAR(100) NOT NULL UNIQUE,
            age INT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """

        cursor.execute(create_table_sql)
        
    # 提交事务
    connection.commit()
    
finally:
    connection.close()

插入数据

import pymysql
from datetime import datetime

# 插入单条数据
def insert_user(username, email, age):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 使用参数化查询防止SQL注入
            sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
            cursor.execute(sql, (username, email, age))
            
        # 提交事务
        connection.commit()
        print("用户插入成功")
        
    except Exception as e:
        # 回滚事务
        connection.rollback()
        print(f"插入失败: {e}")
        
    finally:
        connection.close()

# 调用函数
insert_user("张三""zhangsan@example.com"25)

批量插入数据

import pymysql

# 批量插入数据
def batch_insert_users(users_data):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
            # executemany用于批量插入
            cursor.executemany(sql, users_data)
            
        connection.commit()
        print(f"成功插入{len(users_data)}条用户数据")
        
    except Exception as e:
        connection.rollback()
        print(f"批量插入失败: {e}")
        
    finally:
        connection.close()

# 准备批量数据
users = [
    ("李四""lisi@example.com"28),
    ("王五""wangwu@example.com"32),
    ("赵六""zhaoliu@example.com"29)
]

# 执行批量插入
batch_insert_users(users)

查询数据

import pymysql

# 查询单条数据
def get_user_by_id(user_id):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "SELECT * FROM users WHERE id = %s"
            cursor.execute(sql, (user_id,))
            result = cursor.fetchone()
            return result
            
    finally:
        connection.close()

# 查询多条数据
def get_users_by_age(min_age):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "SELECT * FROM users WHERE age >= %s ORDER BY age"
            cursor.execute(sql, (min_age,))
            results = cursor.fetchall()
            return results
            
    finally:
        connection.close()

# 使用示例
user = get_user_by_id(1)
if user:
    print(f"用户信息: {user}")

users = get_users_by_age(25)
print("年龄大于等于25的用户:")
for user in users:
    print(f"  {user['username']}{user['age']}岁")

更新数据

import pymysql

# 更新用户信息
def update_user(user_id, username=None, email=None, age=None):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 动态构建更新语句
            updates = []
            params = []
            
            if username:
                updates.append("username = %s")
                params.append(username)
            if email:
                updates.append("email = %s")
                params.append(email)
            if age is not None:
                updates.append("age = %s")
                params.append(age)
                
            if not updates:
                print("没有需要更新的字段")
                return
                
            # 添加WHERE条件参数
            params.append(user_id)
            
            sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
            cursor.execute(sql, params)
            
        connection.commit()
        print("用户信息更新成功")
        
    except Exception as e:
        connection.rollback()
        print(f"更新失败: {e}")
        
    finally:
        connection.close()

# 使用示例
update_user(1, age=26, email="newemail@example.com")

删除数据

import pymysql

# 删除用户
def delete_user(user_id):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "DELETE FROM users WHERE id = %s"
            cursor.execute(sql, (user_id,))
            
        connection.commit()
        print("用户删除成功")
        
    except Exception as e:
        connection.rollback()
        print(f"删除失败: {e}")
        
    finally:
        connection.close()

# 使用示例
delete_user(1)

事务处理

基本事务操作

import pymysql

# 事务示例:转账操作
def transfer_money(from_user_id, to_user_id, amount):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 开始事务
            connection.begin()
            
            # 检查转出用户余额
            cursor.execute("SELECT balance FROM accounts WHERE user_id = %s", (from_user_id,))
            from_balance = cursor.fetchone()
            if not from_balance or from_balance[0] < amount:
                raise Exception("余额不足")
                
            # 扣除转出用户余额
            cursor.execute("UPDATE accounts SET balance = balance - %s WHERE user_id = %s"
                          (amount, from_user_id))
                          
            # 增加转入用户余额
            cursor.execute("UPDATE accounts SET balance = balance + %s WHERE user_id = %s"
                          (amount, to_user_id))
            
            # 记录转账日志
            cursor.execute("""
                INSERT INTO transfer_logs (from_user_id, to_user_id, amount, transfer_time) 
                VALUES (%s, %s, %s, NOW())
            """
, (from_user_id, to_user_id, amount))
            
            # 提交事务
            connection.commit()
            print("转账成功")
            
    except Exception as e:
        # 回滚事务
        connection.rollback()
        print(f"转账失败: {e}")
        
    finally:
        connection.close()

# 使用示例
transfer_money(12100.00)

高级查询技巧

分页查询

import pymysql

# 分页查询用户列表
def get_users_paginated(page, page_size):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            offset = (page - 1) * page_size
            sql = "SELECT * FROM users LIMIT %s OFFSET %s"
            cursor.execute(sql, (page_size, offset))
            users = cursor.fetchall()
            
            # 获取总记录数
            cursor.execute("SELECT COUNT(*) as total FROM users")
            total = cursor.fetchone()['total']
            
            return {
                'users': users,
                'total': total,
                'page': page,
                'page_size': page_size,
                'total_pages': (total + page_size - 1) // page_size
            }
            
    finally:
        connection.close()

# 使用示例
result = get_users_paginated(110)
print(f"第{result['page']}页,共{result['total_pages']}页")
for user in result['users']:
    print(f"  {user['username']}")

条件查询

import pymysql

# 复杂条件查询
def search_users(username=None, min_age=None, max_age=None, email_domain=None):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            # 动态构建查询条件
            conditions = []
            params = []
            
            if username:
                conditions.append("username LIKE %s")
                params.append(f"%{username}%")
                
            if min_age is not None:
                conditions.append("age >= %s")
                params.append(min_age)
                
            if max_age is not None:
                conditions.append("age <= %s")
                params.append(max_age)
                
            if email_domain:
                conditions.append("email LIKE %s")
                params.append(f"%@{email_domain}")
                
            # 构建SQL语句
            if conditions:
                sql = f"SELECT * FROM users WHERE {' AND '.join(conditions)} ORDER BY created_at DESC"
            else:
                sql = "SELECT * FROM users ORDER BY created_at DESC"
                
            cursor.execute(sql, params)
            results = cursor.fetchall()
            return results
            
    finally:
        connection.close()

# 使用示例
users = search_users(username="张", min_age=20, max_age=30)
print("搜索结果:")
for user in users:
    print(f"  {user['username']}{user['age']}岁, {user['email']}")

错误处理和最佳实践

异常处理

import pymysql
from pymysql import IntegrityError, OperationalError

# 完整的错误处理示例
def safe_database_operation():
    connection = None
    try:
        # 建立连接
        connection = pymysql.connect(
            host='localhost',
            user='root',
            password='your_password',
            database='myapp',
            charset='utf8mb4',
            autocommit=False,
            connect_timeout=5
        )
        
        with connection.cursor() as cursor:
            # 执行数据库操作
            sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
            cursor.execute(sql, ("test_user""test@example.com"25))
            
            # 提交事务
            connection.commit()
            print("操作成功")
            
    except IntegrityError as e:
        # 处理数据完整性错误(如唯一约束冲突)
        if connection:
            connection.rollback()
        print(f"数据完整性错误: {e}")
        
    except OperationalError as e:
        # 处理操作错误(如连接失败)
        print(f"数据库操作错误: {e}")
        
    except Exception as e:
        # 处理其他异常
        if connection:
            connection.rollback()
        print(f"未知错误: {e}")
        
    finally:
        # 确保连接关闭
        if connection:
            connection.close()

# 调用函数
safe_database_operation()

连接池使用

import pymysql
from dbutils.pooled_db import PooledDB

# 创建连接池
pool = PooledDB(
    creator=pymysql,
    host='localhost',
    port=3306,
    user='root',
    password='your_password',
    database='myapp',
    charset='utf8mb4',
    maxconnections=20,  # 最大连接数
    mincached=2,        # 最小缓存连接数
    maxcached=5,        # 最大缓存连接数
    maxshared=3,        # 最大共享连接数
    blocking=True,      # 连接池满时是否阻塞等待
    maxusage=None,      # 单个连接最大复用次数
    setsession=[],      # 开始会话前执行的命令列表
    ping=0              # ping MySQL服务端
)

# 使用连接池
def get_user_with_pool(user_id):
    # 从连接池获取连接
    connection = pool.connection()
    
    try:
        with connection.cursor() as cursor:
            cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            result = cursor.fetchone()
            return result
            
    finally:
        # 关闭连接(归还到连接池)
        connection.close()

# 使用示例
user = get_user_with_pool(1)
if user:
    print(f"用户信息: {user}")

安全最佳实践

防止SQL注入

import pymysql

# 正确的做法:使用参数化查询
def safe_query(username):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 使用参数化查询,防止SQL注入
            sql = "SELECT * FROM users WHERE username = %s"
            cursor.execute(sql, (username,))
            result = cursor.fetchall()
            return result
            
    finally:
        connection.close()

# 错误的做法:字符串拼接(容易被SQL注入)
def unsafe_query(username):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 危险!容易被SQL注入攻击
            sql = f"SELECT * FROM users WHERE username = '{username}'"
            cursor.execute(sql)  # 不要这样做!
            result = cursor.fetchall()
            return result
            
    finally:
        connection.close()

# 安全查询示例
users = safe_query("admin'; DROP TABLE users; --")
print(users)  # 只会查找用户名为 "admin'; DROP TABLE users; --" 的用户

密码安全管理

import pymysql
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 从环境变量获取数据库配置
def get_db_config():
    return {
        'host': os.getenv('DB_HOST''localhost'),
        'port': int(os.getenv('DB_PORT'3306)),
        'user': os.getenv('DB_USER''root'),
        'password': os.getenv('DB_PASSWORD'),
        'database': os.getenv('DB_NAME''myapp'),
        'charset''utf8mb4'
    }

# 使用环境变量连接数据库
def connect_with_env():
    config = get_db_config()
    connection = pymysql.connect(**config)
    return connection

# 创建.env文件示例:
"""
DB_HOST=localhost
DB_PORT=3306
DB_USER=myuser
DB_PASSWORD=mypassword
DB_NAME=myapp
"""

性能优化技巧

批量操作优化

import pymysql

# 高效的批量插入
def efficient_batch_insert(data_list):
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 使用事务和批量插入提高性能
            connection.begin()
            
            sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
            cursor.executemany(sql, data_list)
            
            connection.commit()
            print(f"批量插入{len(data_list)}条记录成功")
            
    except Exception as e:
        connection.rollback()
        print(f"批量插入失败: {e}")
        
    finally:
        connection.close()

# 准备大量数据进行测试
large_data = [
    (f"user_{i}"f"user_{i}@example.com"20 + (i % 50))
    for i in range(1000)
]

# 执行批量插入
efficient_batch_insert(large_data)

查询优化

import pymysql

# 使用索引优化查询
def optimized_query():
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='myapp',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            # 为经常查询的字段创建索引
            cursor.execute("CREATE INDEX idx_username ON users(username)")
            cursor.execute("CREATE INDEX idx_email ON users(email)")
            cursor.execute("CREATE INDEX idx_age ON users(age)")
            
            # 使用EXPLAIN分析查询性能
            cursor.execute("EXPLAIN SELECT * FROM users WHERE username = 'admin'")
            explain_result = cursor.fetchall()
            print("查询执行计划:")
            for row in explain_result:
                print(row)
                
    finally:
        connection.close()

结语

到这里,PyMySQL操作数据库的全面学习就完成了!从安装配置、基本操作到高级技巧,每一步都详细讲解了。

记住几个关键点:

  1. 安全第一:始终使用参数化查询防止SQL注入
  2. 连接管理:合理使用连接池提高性能
  3. 事务处理:正确使用事务保证数据一致性
  4. 错误处理:完善的异常处理机制
  5. 性能优化:批量操作和索引优化

PyMySQL是Python操作MySQL数据库的强大工具,掌握它对于后端开发人员来说至关重要。


阅读原文:原文链接


该文章在 2025/12/10 18:44:33 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved