上个版本的代码

deepseek大模型能连接mysql数据查询吗-CSDN博客

存在一些潜在缺陷,并且没有实现连续对话,本次予以升级

主要缺陷和改进建议:

  1. SQL注入风险

# 改进方法:增加安全校验函数
def is_safe_sql(sql: str) -> bool:
    forbidden_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'GRANT', 'TRUNCATE']
    return not any(keyword in sql.upper() for keyword in forbidden_keywords)

# 在执行前添加校验
if not is_safe_sql(generated_sql):
    print("检测到危险操作,已终止执行")
  1. 数据库连接管理

# 使用连接池改进
from mysql.connector import pooling

# 初始化连接池(放在全局区域)
db_pool = pooling.MySQLConnectionPool(
    pool_name="mypool",
    pool_size=5,
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    database=os.getenv("DB_NAME")
)

# 修改execute_sql使用连接池
def execute_sql(sql: str):
    try:
        conn = db_pool.get_connection()
        cursor = conn.cursor()
        cursor.execute(sql)
        return cursor.fetchall()
    finally:
        if 'conn' in locals() and conn.is_connected():
            cursor.close()
            conn.close()
  1. 连续对话实现

def main():
    conversation_history = [
        {"role": "system", "content": "你是一个专业的SQL生成助手,请严格遵守数据安全规范"}
    ]
    
    while True:
        user_input = input("\n请输入问题(输入'退出'结束): ")
        if user_input.lower() in ['退出', 'exit']:
            break
            
        # 添加用户输入到历史
        conversation_history.append({"role": "user", "content": user_input})
        
        # 获取AI回复
        response = client.chat.completions.create(
            model="gpt-4",
            messages=conversation_history,
            temperature=0.3
        )
        
        # 解析回复
        ai_response = response.choices.message.content
        print(f"\nAI回复: {ai_response}")
        
        # 添加AI回复到历史(限制历史长度)
        conversation_history.append({"role": "assistant", "content": ai_response})
        conversation_history = conversation_history[-6:]  # 保留最近3轮对话

        # 执行确认机制
        if "SELECT" in ai_response.upper():
            execute = input("是否执行该SQL?(y/n) ").lower()
            if execute == 'y':
                result = execute_sql(ai_response)
                print("查询结果:", result)
  1. 增强错误处理

def execute_sql(sql: str):
    try:
        # ...原有连接代码...
    except mysql.connector.Error as err:
        logger.error(f"SQL执行失败: {sql} | 错误:{err}")  # 添加日志记录
        return f"执行错误:请检查查询语句有效性(错误代码:{err.errno})"
    except Exception as e:
        logger.error(f"未知错误: {e}")
        return "系统内部错误,请联系管理员"
  1. 环境变量验证

# 在程序启动时添加检查
required_env_vars = ['DB_HOST', 'DB_USER', 'DB_PASSWORD', 'DB_NAME', 'OPENAI_API_KEY']
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
    raise EnvironmentError(f"缺少必要环境变量: {', '.join(missing_vars)}")

完整改进后的调用流程示例:

  1. 用户输入:"显示最近一周下单最多的用户"
  2. AI生成:"SELECT user_id, COUNT() FROM orders WHERE date >= CURDATE() - INTERVAL 7 DAY GROUP BY user_id ORDER BY COUNT() DESC LIMIT 1"
  3. 程序提示是否执行
  4. 用户确认后返回查询结果
  5. 用户继续输入:"用中文显示客户姓名而不是ID"
  6. AI生成新SQL(包含JOIN操作):"SELECT u.name, COUNT(*) ..."

其他改进建议:

  • 添加SQL格式化输出(使用prettytable库)
  • 实现查询结果缓存机制
  • 添加查询耗时统计
  • 对敏感字段(如email)添加自动脱敏处理
  • 支持多轮修改(如用户说"把时间范围改成最近一个月")

这些改进可以在保证安全性的同时,提供更友好的交互体验,同时保持对话上下文的相关性。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐