soarli

记录一次迁移MySQL大体积数据表的实践
近期学习实践数据库的笔记,文章内容和代码均系原创(部分知识概述型文本由soarchat 1.4和gpt4提供成文支...
扫描右侧二维码阅读全文
29
2023/08

记录一次迁移MySQL大体积数据表的实践

近期学习实践数据库的笔记,文章内容和代码均系原创(部分知识概述型文本由soarchat 1.4gpt4提供成文支持),转载请注明来源!

前言

近期因某线上场景特殊需要,得对一张6.19GBMySQL数据表进行数据库环境的迁移,首先考虑到的方法当然是通过手头Navicat自带的导入/导出/转储功能来快速实现。但是经过半天的尝试,不管通过什么样的文件结构作为媒介,在导入的过程中总会发生一些错误问题(最好的一次还丢失了300多条数据)。在按照这种思路折腾的过程中,还发生了几次电脑的内存溢出而不得不重启OwO...

于是尝试在网上搜寻关于这种问题的解决方案,谁料网上也是一片哀鸿遍野。个别能解决的,要么是让把文件导入到Linux系统中,通过Linux独有的split命令先分割再进行操作完后再取出,亦或是修改MySQL的类似于max_allowed_packet的各种配置(亲测无效),再或者就是换了其他的数据库管理工具(mysqldump之类的)。

见到此状,想到了一句名言“自己动手,丰衣足食”,说干就干,在充分考虑读取大文件对内存的需求的基础上,对程序进行了严格的内存优化。最终的结果是一次执行通过,零错误零丢失,同时也没有发生内存泄漏!兴奋之余决定把总体的设计思路和程序开源到这篇文章里面,供后续有需求的人进行参考或修改/优化。如有大佬有更好的思路或优化建议,还望不吝赐教!!

思路

导出数据正常走Navicat的转储流程没啥问题,得到一个体积超大的sql文件之后,如何把它执行到一个新的数据库里是接下来讨论的重点问题。

STEP1:拆分

splitsql.py

本程序的主要目标是高效地将大型SQL文件拆分成更小的文件。核心原理在于使用带缓冲区的文件读写操作,通过计数器和指定的行数限制将源文件逐行划分为多个小文件。

关键步骤:

1.逐行读取输入文件

2.将每一行内容添加到一个临时缓存列表中,直到达到预先设定的行数限制

3.新建一个带有编号的输出文件,将缓存的数据写入该文件。重复这个过程,直到遍历完整个输入文件。

4.程序最后会判断是否还有剩余未处理的数据,若有,则将剩余数据写入一个新的输出文件。

意义:

提供了一种简洁高效的方法来处理大型SQL文件,尤其在数据库结构复杂、资源有限的场景下格外实用。此外,通过自定义行数限制和文件名前缀,用户可以根据实际需求对程序进行定制。总之,这是一个有助于简化大型数据库的维护和管理的工具。

input_file_path = "table_testdata.sql"
num_lines_per_file = 500
output_file_prefix = ""

def split_sql_file():
    with open(input_file_path, "r", encoding="utf-8", errors="ignore", buffering=8192) as input_file:
        file_count = 1
        line_counter = 0
        data = []

        for line in input_file:
            if line_counter < num_lines_per_file:
                data.append(line)
                line_counter += 1
            else:
                with open(f"{output_file_prefix}{file_count}.sql", "w", encoding="utf-8", buffering=8192) as output_file:
                    output_file.writelines(data)
                file_count += 1
                line_counter = 1
                data = [line]

        if data:
            with open(f"{output_file_prefix}{file_count}.sql", "w", encoding="utf-8", buffering=8192) as output_file:
                output_file.writelines(data)

if __name__ == '__main__':
    split_sql_file()

STEP2:重组

importsql.py

程序主要功能是自动执行指定目录下的所有SQL文件。

程序的核心原理在于通过连接到MySQL数据库,然后顺序执行目录中按文件名排序的SQL文件。

程序首先通过配置信息创建一个数据库连接,并处理可能出现的错误。之后,它会遍历目录下所有以“.sql”为扩展名的文件,将其内容逐个加载并划分为SQL语句。为了优化批量执行效率,它将SQL语句分组成批次,并在执行过程中定时检查数据库连接是否仍然可用。

该程序实现了以下功能:

通过设置可配置参数及错误处理,确保连接及执行过程更为可靠;

在程序每次执行之间设置延时,从而减轻数据库压力;

以另外提供的log.txt文件形式记录错误和执行情况,方便追踪。

这款程序可以节省数据库管理过程中的时间与手动执行脚本的工作量,提高工作效率,尤其适用于需要定期执行大量SQL脚本的场景

import os
import mysql.connector
import logging
import time
from mysql.connector import errorcode

STATEMENTS_PER_BATCH = 1000
TIME_BETWEEN_BATCHES = 0

logging.basicConfig(filename='log.txt', level=logging.INFO, filemode='w', format='%(asctime)s - %(message)s')

def connect_to_database():
    config = {
        'user': 'test',
        'password': 'test',
        'host': 'localhost',
        'database': 'test',
        'connection_timeout': 86400,
    }

    try:
        cnx = mysql.connector.connect(**config)
        return cnx
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            logging.error("有错误发生:用户名或密码错误")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            logging.error("有错误发生:数据库不存在")
        else:
            logging.error(err)
        return None

def execute_sql_files_in_directory(cnx):
    cursor = cnx.cursor()
    sql_files = [f for f in os.listdir('.') if f.endswith('.sql')]

    # 定义一个函数,用于从文件名中获取数字,并返回一个整数
    def get_file_number(file_name):
        import re
        match = re.search(r'\d+', file_name) # 查找文件名中的数字
        if match:
            return int(match.group()) # 将数字转换为整数并返回
        else:
            return 0 # 如果没有找到数字,返回 0

    # 使用 sorted 函数对 sql_files 列表进行排序,并传入 get_file_number 函数作为关键字参数
    sql_files = sorted(sql_files, key=get_file_number)

    for sql_file in sql_files:
        logging.info(f"正在执行 {sql_file} ...")
        with open(sql_file, 'r', encoding='utf-8') as file:
            sql_script = file.read()

        sql_statements = sql_script.split(';')
        statement_batches = [sql_statements[i:i + STATEMENTS_PER_BATCH] for i in range(0, len(sql_statements), STATEMENTS_PER_BATCH)]

        # 在执行每个 SQL 文件之前检查连接是否存活,如果不存活则重新连接
        try:
            cnx.ping(reconnect=True)
            logging.info(f"连接正常")
        except mysql.connector.Error as err:
            logging.error(f"连接失败:{err}")
            break

        for batch_idx, batch in enumerate(statement_batches):
            for idx, statement in enumerate(batch, start=batch_idx * STATEMENTS_PER_BATCH + 1):
                if statement.strip():
                    try:
                        cursor.execute(statement)
                        cnx.commit()
                        logging.info(f"{sql_file}(语句 {idx})执行成功!")
                    except mysql.connector.Error as err:
                        logging.error(f"执行 {sql_file}(语句 {idx})时出错:{err}")
                        break
                    except Exception as e:
                        logging.error(f"执行 {sql_file}(语句 {idx})时出现未知错误:{str(e)}")
                        break
            time.sleep(TIME_BETWEEN_BATCHES)

    cursor.close()

def main():
    cnx = None
    # 添加一个 while 循环,用于在连接失败时重试连接,直到成功或达到最大重试次数为止
    max_retries = 5
    retries = 0
    while cnx is None and retries < max_retries:
        cnx = connect_to_database()
        if cnx is None:
            logging.info(f"连接失败,正在重试({retries + 1}/{max_retries})...")
            retries += 1
            time.sleep(5)
        else:
            logging.info(f"连接成功")
            break
    if cnx is not None:
        execute_sql_files_in_directory(cnx)
        cnx.close()
    else:
        logging.error(f"无法连接到数据库")

if __name__ == '__main__':
    main()

参考资料:

MySQL mysqldump数据导出详解 - pursuer.chen - 博客园

MySQL快速导入千万级别的大数据量sql文件 - 知乎

MySql导入和抽取大数量级文件数据 - 知乎

最后修改:2023 年 08 月 29 日 02 : 22 AM

发表评论