CN118733673A - A database synchronization method and terminal - Google Patents
A database synchronization method and terminal Download PDFInfo
- Publication number
- CN118733673A CN118733673A CN202410774038.8A CN202410774038A CN118733673A CN 118733673 A CN118733673 A CN 118733673A CN 202410774038 A CN202410774038 A CN 202410774038A CN 118733673 A CN118733673 A CN 118733673A
- Authority
- CN
- China
- Prior art keywords
- data
- database
- incremental
- snapshot
- historical
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/11—File system administration, e.g. details of archiving or snapshots
- G06F16/128—Details of file system snapshots on the file-level, e.g. snapshot creation, administration, deletion
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/17—Details of further file system functions
- G06F16/178—Techniques for file synchronisation in file systems
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/18—File system types
- G06F16/1805—Append-only file systems, e.g. using logs or journals to store data
- G06F16/1815—Journaling file systems
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
- G06F16/2358—Change logging, detection, and notification
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Data Mining & Analysis (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computing Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本发明公开了一种数据库同步方法及终端,监听原数据库的日志文件,判断是否需要获取历史数据,若是,则将原数据库中的历史数据提取为历史数据快照,通过历史数据快照同步历史数据至目标数据库后,执行获取增量最新点位信息步骤;否则,获取原数据库中的增量最新点位信息以及存储的历史点位信息,根据历史点位信息以及增量最新点位信息之间的差获取增量日志数据,根据增量日志数据更新目标数据库。本发明根据日志中的增量日志数据对目标数据库进行更新,日志中记录的是操作数据,减小了数据的传输量并且通过监听日志文件的方式判断是否需要进行更新操作,保证目标数据库更新的实时性。
The present invention discloses a database synchronization method and terminal, which monitors the log file of the original database, determines whether it is necessary to obtain historical data, and if so, extracts the historical data in the original database as a historical data snapshot, synchronizes the historical data to the target database through the historical data snapshot, and then executes the step of obtaining the latest incremental point information; otherwise, the latest incremental point information and the stored historical point information in the original database are obtained, and the incremental log data is obtained according to the difference between the historical point information and the latest incremental point information, and the target database is updated according to the incremental log data. The present invention updates the target database according to the incremental log data in the log, and the log records the operation data, which reduces the data transmission amount and determines whether the update operation needs to be performed by monitoring the log file, thereby ensuring the real-time update of the target database.
Description
技术领域Technical Field
本发明涉及数据库管理领域,尤其涉及一种数据库同步方法及终端。The present invention relates to the field of database management, and in particular to a database synchronization method and a terminal.
背景技术Background Art
传统数据库(例如mysql、oracle、sqlserver、postgre、mongodb、oceanBase等)通常采用ACID(原子性atomicity、一致性consistency、隔离性isolation、持久性durability)事务模型,保证事务的完整性和一致性,具有可靠性高的特点,且支持多种数据模型和查询语言,具有高灵活性,因此被广泛应用于各业务系统的后台数据库。但是同样的,传统数据库在处理大规模数据时,性能和扩展性都受到限制,难以满足大数据处理和分析的需求,且存在单点故障。应对如今数字化时代,数据库的数据量越来越大,有的传统数据库查询耗时较长乃至超时,且数据库的并发性和稳定性都下降,影响到生产业务的使用。故在生产业务要求变化的过程中,需要将传统数据库中的数据迁移到新型数据库中,但因二者的特性不同,现有的数据迁移方式无法满足需求。Traditional databases (such as MySQL, Oracle, SQL Server, Postgre, MongoDB, OceanBase, etc.) usually adopt the ACID (atomicity, consistency, isolation, durability) transaction model to ensure the integrity and consistency of transactions. They are highly reliable and support multiple data models and query languages. They are highly flexible and are therefore widely used as backend databases for various business systems. However, when processing large-scale data, traditional databases are limited in performance and scalability, making it difficult to meet the needs of big data processing and analysis, and have single points of failure. In response to the digital age, the amount of data in databases is increasing. Some traditional database queries take a long time or even time out, and the concurrency and stability of the database are reduced, affecting the use of production services. Therefore, in the process of changing production business requirements, it is necessary to migrate data from traditional databases to new databases. However, due to the different characteristics of the two, the existing data migration method cannot meet the needs.
发明内容Summary of the invention
本发明所要解决的技术问题是:提供一种数据库同步方法及终端,实现不同类型数据库之间的数据同步。The technical problem to be solved by the present invention is to provide a database synchronization method and a terminal to achieve data synchronization between different types of databases.
为了解决上述技术问题,本发明采用的一种技术方案为:In order to solve the above technical problems, a technical solution adopted by the present invention is:
一种数据库同步方法,包括步骤:A database synchronization method comprises the steps of:
监听原数据库的日志文件,判断是否需要获取历史数据,若是,则将所述原数据库中的历史数据提取为历史数据快照,通过所述历史数据快照同步所述历史数据至目标数据库后,执行获取增量最新点位信息步骤;Monitor the log file of the original database to determine whether historical data needs to be obtained. If so, extract the historical data in the original database as a historical data snapshot, synchronize the historical data to the target database through the historical data snapshot, and then execute the step of obtaining the latest incremental point information;
否则,获取原数据库中的增量最新点位信息以及存储的历史点位信息,根据所述历史点位信息以及所述增量最新点位信息之间的差获取增量日志数据,根据所述增量日志数据更新目标数据库。Otherwise, the incremental latest point information and the stored historical point information in the original database are obtained, the incremental log data is obtained according to the difference between the historical point information and the incremental latest point information, and the target database is updated according to the incremental log data.
为了解决上述技术问题,本发明采用的另一种技术方案为:In order to solve the above technical problems, another technical solution adopted by the present invention is:
一种数据库同步终端,包括存储器、处理器及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现以下步骤:A database synchronization terminal comprises a memory, a processor, and a computer program stored in the memory and executable on the processor, wherein the processor implements the following steps when executing the computer program:
监听原数据库的日志文件,判断是否需要获取历史数据,若是,则将所述原数据库中的历史数据提取为历史数据快照,通过所述历史数据快照同步所述历史数据至目标数据库后,执行获取增量最新点位信息步骤;Monitor the log file of the original database to determine whether historical data needs to be obtained. If so, extract the historical data in the original database as a historical data snapshot, synchronize the historical data to the target database through the historical data snapshot, and then execute the step of obtaining the latest incremental point information;
否则,获取原数据库中的增量最新点位信息以及存储的历史点位信息,根据所述历史点位信息以及所述增量最新点位信息之间的差获取增量日志数据,根据所述增量日志数据更新目标数据库。Otherwise, the incremental latest point information and the stored historical point information in the original database are obtained, the incremental log data is obtained according to the difference between the historical point information and the incremental latest point information, and the target database is updated according to the incremental log data.
本发明的有益效果在于:在进行数据同步的过程中,若需要获取历史数据,则将历史数据提取为历史数据快照,对快照进行同步,降低了对原数据库的访问压力,并且在同步完历史数据之后,自动切换到同步增量数据,则能够在数据迁移后继续使用原数据库自动同步到新设置的目标数据库,满足读写分离的需求,保证数据的安全性,即在同步完成历史数据的情况下,根据日志中的增量日志数据对目标数据库进行更新,日志中记录的是操作数据,减小了数据的传输量并且通过监听日志文件的方式判断是否需要进行更新操作,保证目标数据库更新的实时性。The beneficial effects of the present invention are as follows: in the process of data synchronization, if historical data needs to be obtained, the historical data is extracted as a historical data snapshot, and the snapshot is synchronized, thereby reducing the access pressure to the original database, and after the historical data is synchronized, it is automatically switched to synchronize incremental data, so that the original database can continue to be used after data migration and automatically synchronized to the newly set target database, meeting the requirements of read-write separation and ensuring data security, that is, when the historical data is synchronized, the target database is updated according to the incremental log data in the log, and the log records the operation data, thereby reducing the amount of data transmission and judging whether an update operation is needed by monitoring the log file, thereby ensuring the real-time update of the target database.
附图说明BRIEF DESCRIPTION OF THE DRAWINGS
图1为本发明实施例的一种数据同步方法的步骤流程图;FIG1 is a flowchart of a data synchronization method according to an embodiment of the present invention;
图2为本发明实施例的一种数据同步方法的另一步骤流程图;FIG2 is a flowchart of another step of a data synchronization method according to an embodiment of the present invention;
图3为本发明实施例的历史数据和增量数据同步示意图;FIG3 is a schematic diagram of synchronization of historical data and incremental data according to an embodiment of the present invention;
图4为本发明实施例的分片流程图;FIG4 is a fragmentation flow chart of an embodiment of the present invention;
图5为本发明实施例序列化后的一条日志示意图;FIG5 is a schematic diagram of a serialized log according to an embodiment of the present invention;
图6为本发明实施例的两阶段确认方式的流程图;FIG6 is a flow chart of a two-stage confirmation method according to an embodiment of the present invention;
图7为本发明实施例的状态存储示意图;FIG7 is a schematic diagram of state storage according to an embodiment of the present invention;
图8为本发明实施例的系统数据流程图;FIG8 is a system data flow diagram of an embodiment of the present invention;
图9为本发明实施例的终端的结构示意图;FIG9 is a schematic diagram of the structure of a terminal according to an embodiment of the present invention;
标号说明:Description of labels:
1、一种数据同步终端;2、处理器;3、存储器。1. A data synchronization terminal; 2. A processor; 3. A memory.
具体实施方式DETAILED DESCRIPTION
为详细说明本发明的技术内容、所实现目的及效果,以下结合实施方式并配合附图予以说明。In order to explain the technical content, achieved objectives and effects of the present invention in detail, the following is an explanation in combination with the implementation modes and the accompanying drawings.
请参照图1,一种数据库同步方法,包括步骤:Please refer to FIG. 1 , a database synchronization method includes the following steps:
监听原数据库的日志文件,判断是否需要获取历史数据,若是,则将所述原数据库中的历史数据提取为历史数据快照,通过所述历史数据快照同步所述历史数据至目标数据库后,执行获取增量最新点位信息步骤;Monitor the log file of the original database to determine whether historical data needs to be obtained. If so, extract the historical data in the original database as a historical data snapshot, synchronize the historical data to the target database through the historical data snapshot, and then execute the step of obtaining the latest incremental point information;
否则,获取原数据库中的增量最新点位信息以及存储的历史点位信息,根据所述历史点位信息以及所述增量最新点位信息之间的差获取增量日志数据,根据所述增量日志数据更新目标数据库。Otherwise, the incremental latest point information and the stored historical point information in the original database are obtained, the incremental log data is obtained according to the difference between the historical point information and the incremental latest point information, and the target database is updated according to the incremental log data.
从上述描述可知,本发明的有益效果在于:在进行数据同步的过程中,若需要获取历史数据,则将历史数据提取为历史数据快照,对快照进行同步,降低了对原数据库的访问压力,并且在同步完历史数据之后,自动切换到同步增量数据,则能够在数据迁移后继续使用原数据库自动同步到新设置的目标数据库,满足读写分离的需求,保证数据的安全性,即在同步完成历史数据的情况下,根据日志中的增量日志数据对目标数据库进行更新,日志中记录的是操作数据,减小了数据的传输量并且通过监听日志文件的方式判断是否需要进行更新操作,保证目标数据库更新的实时性。From the above description, it can be seen that the beneficial effects of the present invention are: in the process of data synchronization, if historical data needs to be obtained, the historical data is extracted as a historical data snapshot, and the snapshot is synchronized, which reduces the access pressure to the original database, and after the historical data is synchronized, it automatically switches to synchronize incremental data, so that the original database can continue to be used after data migration and automatically synchronize to the newly set target database, meeting the requirements of read-write separation and ensuring data security. That is, when the historical data is synchronized, the target database is updated according to the incremental log data in the log. The log records the operation data, which reduces the amount of data transmission and determines whether an update operation is needed by monitoring the log file, thereby ensuring the real-time update of the target database.
进一步地,所述通过所述历史数据快照同步所述历史数据至目标数据库包括:Furthermore, synchronizing the historical data to the target database through the historical data snapshot includes:
对所述历史数据快照进行切片,得到切片快照;Slicing the historical data snapshot to obtain a slice snapshot;
将每一所述切片快照分别发送到不同的任务管理器,由不同的所述任务管理器同时将所述切片快照同步至目标数据库。Each of the slice snapshots is sent to a different task manager, and the different task managers synchronize the slice snapshots to the target database at the same time.
由上述描述可知,将历史数据快照切片后,由多个任务管理器同步执行各个切片的同步任务,则能够充分利用程序的并行度提高任务的同步效率,即作业管理器(JobManager)将快照切分成多个片(split)之后,分发给相应的N个任务管理器(Taskmanager),后续的并行度就由1增加到所制定的N个并行度从而提高了同步效率。From the above description, it can be seen that after slicing the historical data snapshot, multiple task managers synchronously execute the synchronization tasks of each slice, which can make full use of the parallelism of the program to improve the synchronization efficiency of the task. That is, after the job manager (JobManager) splits the snapshot into multiple slices (split), it distributes them to the corresponding N task managers (Taskmanager), and the subsequent parallelism is increased from 1 to the specified N parallelism, thereby improving the synchronization efficiency.
进一步地,所述对所述历史数据快照进行切片,得到切片快照包括:Furthermore, slicing the historical data snapshot to obtain the slice snapshot includes:
获取历史数据快照中的表快照,提取每一所述表快照的表名称;Obtaining table snapshots in the historical data snapshots, and extracting the table name of each of the table snapshots;
根据所述表名称获取表结构,并判断所述表快照是否包括主键;Acquire a table structure according to the table name, and determine whether the table snapshot includes a primary key;
若是,则根据所述主键以及预设切片步长进行切片得到切片快照;If yes, slice the data according to the primary key and the preset slice step to obtain a slice snapshot;
否则,接收切片字段,计算所述表快照中每一所述切片字段对应的哈希值,根据所述哈希值以及预设切片步长进行切片得到切片快照。Otherwise, a slice field is received, a hash value corresponding to each slice field in the table snapshot is calculated, and slices are performed according to the hash value and a preset slice step to obtain a slice snapshot.
由上述描述可知,若表中有主键,直接根据主键以预设步长进行切片得到切片快照,若表中没有主键,则根据选择的字段计算字段对应的哈希值,通过哈希值以预设步长进行切片,预设步长可以根据能够并行执行同步任务的任务管理器的数量确定,将需要同步的历史数据尽量均匀分配到各个任务管理器中进行处理,最大程度保证了数据同步的效率,保证了任务管理器的利用率,并且在无法使用主键进行切片时,提供了一种快照切片的方案,提升了方案的适用度。From the above description, it can be seen that if there is a primary key in the table, the slice snapshot is obtained by directly slicing according to the primary key with a preset step size. If there is no primary key in the table, the hash value corresponding to the field is calculated according to the selected field, and the slice is performed with a preset step size through the hash value. The preset step size can be determined according to the number of task managers that can execute synchronization tasks in parallel, and the historical data that needs to be synchronized is distributed as evenly as possible to each task manager for processing, which maximizes the efficiency of data synchronization and the utilization rate of the task manager. In addition, when the primary key cannot be used for slicing, a snapshot slicing solution is provided to improve the applicability of the solution.
进一步地,所述获取原数据库中的增量最新点位信息以及存储的历史点位信息,根据所述历史点位信息以及所述增量最新点位信息之间的差获取增量日志数据,根据所述增量日志数据更新目标数据库包括:Further, the acquiring of the incremental latest point information and the stored historical point information in the original database, acquiring incremental log data according to the difference between the historical point information and the incremental latest point information, and updating the target database according to the incremental log data comprises:
记录原数据库中的当前数据操作序列号作为增量最新点位信息,判断所述历史点位信息与所述增量最新点位信息之间是否存在差值;Record the current data operation sequence number in the original database as the incremental latest point information, and determine whether there is a difference between the historical point information and the incremental latest point information;
若是,则将所述历史点位信息增加预设值后得到同步操作序列号,根据所述同步操作序列号获取增量日志数据,根据所述增量日志数据更新目标数据库后,将所述同步操作序列号作为历史点位信息,返回执行所述记录原数据库中的当前数据操作序列号作为增量最新点位信息步骤。If so, the historical point information is increased by a preset value to obtain a synchronization operation sequence number, and the incremental log data is obtained according to the synchronization operation sequence number. After the target database is updated according to the incremental log data, the synchronization operation sequence number is used as the historical point information, and the process returns to execute the step of recording the current data operation sequence number in the original database as the incremental latest point information.
由上述描述可知,若历史点位信息与增量最新点位信息之间存在差值,则说明产生了新的日志需要继续进行同步;但是在同步的过程中,因没有停止原数据库的使用,故可能在同步过程中持续产生新的日志,此时根据数据库操作序列号的递增规则增加预设值后得到同步操作序列号之后进行更新,并将同步操作序列号存储为历史点位信息继续判断与增量最新点位信息之间是否存在差值,能够利用数据操作序列号与日志中操作记录的一一对应关系避免同步过程中的遗漏或是重复的同步操作。From the above description, it can be seen that if there is a difference between the historical point information and the incremental latest point information, it means that a new log has been generated and synchronization needs to continue; however, during the synchronization process, because the use of the original database is not stopped, new logs may continue to be generated during the synchronization process. At this time, according to the incremental rule of the database operation sequence number, the preset value is increased to obtain the synchronization operation sequence number, and then it is updated, and the synchronization operation sequence number is stored as the historical point information to continue to judge whether there is a difference with the incremental latest point information. The one-to-one correspondence between the data operation sequence number and the operation record in the log can be used to avoid omissions or repeated synchronization operations during the synchronization process.
进一步地,若需要获取历史数据,则在开始提取历史数据快照时,记录当前数据操作序列号作为历史点位信息。Furthermore, if historical data needs to be obtained, the current data operation sequence number is recorded as historical point information when the historical data snapshot is extracted.
由上述描述可知,在进行历史数据的同步时,记录开启提取历史数据快照时的当前数据操作序列号,即在该操作之前的数据库同步都以历史数据快照的方式同步,在这之后的数据同步继续通过增量即同步日志数据的方式进行同步,能够直接连接历史数据同步和增量数据同步两个步骤,从而无需分别建立处理流程判断增量数据同步的开始位置。From the above description, it can be seen that when synchronizing historical data, the current data operation sequence number when the historical data snapshot is extracted is recorded, that is, the database synchronization before this operation is synchronized in the form of historical data snapshots, and the data synchronization thereafter continues to be synchronized in the form of increments, that is, synchronization of log data. The two steps of historical data synchronization and incremental data synchronization can be directly connected, so there is no need to establish separate processing flows to determine the starting position of incremental data synchronization.
进一步地,所述根据所述同步操作序列号获取增量日志数据包括:Further, the acquiring incremental log data according to the synchronization operation sequence number includes:
获取不同日志组的标志位范围;Get the flag range of different log groups;
获取所述同步操作序列号所述的标志位范围所对应的目标日志组;Obtain the target log group corresponding to the flag bit range described in the synchronization operation sequence number;
在所述日志组中检索所述同步操作序列号对应的日志数据,得到增量日志数据。The log data corresponding to the synchronization operation sequence number is retrieved in the log group to obtain incremental log data.
由上述描述可知,在日志文件较多的情况下,会分为日志组存储,降低了查询日志的效率;此时通过获取不同日志组对应的标志位范围,先确定同步操作序列号对应的标志位范围,再在该标志位范围所对应的日志组中搜索同步操作序列号对应的日志数据,先确定一个较小的范围之后再进行日志文件的逐一遍历,提高了处理效率。From the above description, it can be seen that when there are many log files, they will be divided into log groups for storage, which reduces the efficiency of log query; at this time, by obtaining the flag bit ranges corresponding to different log groups, first determine the flag bit range corresponding to the synchronization operation sequence number, and then search for the log data corresponding to the synchronization operation sequence number in the log group corresponding to the flag bit range, first determine a smaller range and then traverse the log files one by one, thereby improving processing efficiency.
进一步地,所述根据所述增量日志数据更新目标数据库包括:Further, updating the target database according to the incremental log data includes:
若所述增量日志数据的操作类型为DDL操作,则将所述DDL操作转换为与所述目标数据库对应的数据库语言格式,根据转换后的所述DDL操作更新目标数据库。If the operation type of the incremental log data is a DDL operation, the DDL operation is converted into a database language format corresponding to the target database, and the target database is updated according to the converted DDL operation.
由上述描述可知,因DDL操作通不涉及数据相关的操作,例如对表结构的修改,包括增加字段、索引等,若日志数据的操作类型是DDL操作,可以直接解析并进行格式转换就能够进行操作同步。From the above description, we can see that since DDL operations generally do not involve data-related operations, such as modifications to the table structure, including adding fields and indexes, if the operation type of the log data is a DDL operation, it can be directly parsed and format converted to synchronize the operation.
进一步地,所述根据所述增量日志数据更新目标数据库包括:Further, updating the target database according to the incremental log data includes:
若所述增量日志数据的操作类型为DML操作,则解析所述DML操作对应的操作数据,并将所述操作数据转换为适配目标数据库的格式;If the operation type of the incremental log data is a DML operation, parsing the operation data corresponding to the DML operation and converting the operation data into a format adapted to the target database;
将所述DML操作转换为与所述目标数据库对应的数据库语言格式,根据转换后的DML操作以及操作数据更新目标数据库。The DML operation is converted into a database language format corresponding to the target database, and the target database is updated according to the converted DML operation and operation data.
由上述描述可知,DML操作通常伴随数据的修改,例如数据的插入、更新或删除,则在同步到目标数据库的过程中,不仅需要解析操作本身,还需要解析操作所对应的对象即原有操作的数据,之后再转换为目标数据库所对应的格式。As can be seen from the above description, DML operations are usually accompanied by data modifications, such as inserting, updating, or deleting data. In the process of synchronizing to the target database, it is necessary not only to parse the operation itself, but also to parse the object corresponding to the operation, that is, the data of the original operation, and then convert it into the format corresponding to the target database.
进一步地,所述目标数据库为大规模并行处理数据库。Furthermore, the target database is a large-scale parallel processing database.
由上述描述可知,目标数据库设置为高性能的大规模并行处理数据库,则能够利用数据库自身的并行功能实现对分片后快照数据的并行读取,节省软件构建的成本,提升数据库的查询和并发能力,原数据库若为传统数据库,将其中的历史数据和增量的数据实时同步到大规模并行处理数据库中,则能够实现写数据继续写入原数据库,但通过大规模并行处理数据库进行读数据,实现数据库的读写分离,并且读场景通常比写场景的要求更加复杂,在读写分离提高安全性的同时利用大规模并行处理数据库的特点提升了系统的整体性能。From the above description, it can be seen that if the target database is set as a high-performance large-scale parallel processing database, the parallel function of the database itself can be used to realize parallel reading of snapshot data after sharding, saving the cost of software construction and improving the query and concurrency capabilities of the database. If the original database is a traditional database, the historical data and incremental data therein are synchronized to the large-scale parallel processing database in real time, so that the write data can continue to be written to the original database, but the data is read through the large-scale parallel processing database to realize the read-write separation of the database. The read scenario is usually more complex than the write scenario. While the read-write separation improves security, the characteristics of the large-scale parallel processing database are used to improve the overall performance of the system.
请参照图8,一种数据库同步终端,包括存储器、处理器及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现上述的一种数据库同步方法中的各个步骤。Please refer to FIG. 8 , a database synchronization terminal includes a memory, a processor, and a computer program stored in the memory and executable on the processor. When the processor executes the computer program, each step in the above-mentioned database synchronization method is implemented.
本发明上述一种数据库同步方法及终端能够适用于对数据库进行同步的过程,尤其是原数据库和目标数据库的类型不同时的数据库同步过程;以下通过具体实施方式进行说明。The above-mentioned database synchronization method and terminal of the present invention can be applied to the process of synchronizing a database, especially the database synchronization process when the types of the original database and the target database are different; the following is an explanation through a specific implementation method.
在现有使用传统数据库构建系统的场景中,因传统数据库存在单点故障后影响系统提供服务、无法满足海量数据处理、无法满足即时查询的问题。即使在传统数据库的基础上进行分库、分表、加索引等优化方式,也无法从根本上解决问题。故本申请通过构建历史数据和增量数据实时同步的同步方法,将原数据库中的数据同步到目标数据库中,确保两个数据库中数据的时效性,从而能够实现读写分离,提高整体性能。In the existing scenario of using traditional databases to build systems, the traditional database has a single point of failure that affects the system's service provision, cannot meet the needs of massive data processing, and cannot meet the needs of instant queries. Even if optimization methods such as sub-library, sub-table, and indexing are performed on the basis of traditional databases, the problem cannot be fundamentally solved. Therefore, this application constructs a synchronization method for real-time synchronization of historical data and incremental data, synchronizes the data in the original database to the target database, ensures the timeliness of the data in the two databases, and thus can achieve read-write separation and improve overall performance.
请参照图1-7,本发明的实施例一为:Please refer to Figures 1-7, embodiment 1 of the present invention is:
一种数据库同步方法,包括步骤:A database synchronization method comprises the steps of:
S1、监听原数据库的日志文件,判断是否需要获取历史数据,若是,则执行S2,否则执行S3;S1. Monitor the log files of the original database to determine whether historical data needs to be obtained. If so, execute S2; otherwise, execute S3.
其中,日志文件保存了对数据库中数据的操作;可以通过原数据库开启日志归档,生成归档日志和元数据控制信息,将数据库的增、删、改操作(DML)以及schema_change(表结构或数据库结构的改变,属于DDL操作)都记录到日志文件中;The log file stores the operations on the data in the database. You can enable log archiving through the original database to generate archive logs and metadata control information, and record the database add, delete, modify operations (DML) and schema_change (changes in table structure or database structure, which belong to DDL operations) in the log file.
在传统的同步方式中,若需要实时同步数据,通常是通过对表增加时间字段,进行微批量的JDBC(Java DataBase Connectivity,JAVA数据库连接)轮询操作,但这种方式无法获取到DDL操作并且存在较大的延时,难以满足实时同步的要求;而通过监听原数据库的方式获取日志文件的变化,可以实现毫秒级的响应,并且因为读取的是日志文件而不是数据库表本身,对原数据库的查询功能没有影响,不会增加原数据库的查询压力;例如可以通过kafka connect(一种数据搬运集成工具)中的polling方式实现实时监听;In the traditional synchronization method, if real-time data synchronization is required, a time field is usually added to the table to perform micro-batch JDBC (Java DataBase Connectivity) polling operations. However, this method cannot obtain DDL operations and has a large delay, making it difficult to meet the requirements of real-time synchronization. By monitoring the original database to obtain changes in log files, a millisecond-level response can be achieved. Moreover, because the log files are read instead of the database tables themselves, the query function of the original database is not affected and the query pressure of the original database is not increased. For example, real-time monitoring can be achieved through the polling method in Kafka Connect (a data handling integration tool).
其中,polling方式是指连接器(connectors)定期从源系统或目标系统中获取数据或将数据写入的过程。连接器通过持续地轮询来检查源系统是否有新的数据可用或目标系统是否可以接收新的数据,从而实现数据的传输和同步。此过程中,将kafka内置到我们的同步工具当中,无需额外再部署kafka集群;则Polling过程如下:Polling refers to the process in which connectors periodically obtain data from or write data to the source system or target system. Connectors continuously poll to check whether new data is available in the source system or whether the target system can receive new data, thereby achieving data transmission and synchronization. In this process, Kafka is built into our synchronization tool, and there is no need to deploy a Kafka cluster separately; the Polling process is as follows:
(1)配置连接器:配置Kafka Connect连接器,包括指定源系统(原数据库)和目标系统(目标数据库)的信息、数据转换规则等;(1) Configure the connector: Configure the Kafka Connect connector, including specifying the information of the source system (original database) and the target system (target database), data conversion rules, etc.
(2)启动连接器:启动Kafka Connect连接器,连接器会根据配置从源系统获取数据;(2) Start the connector: Start the Kafka Connect connector, which will obtain data from the source system according to the configuration;
(3)定期轮训:连接器会定期进行轮询,检查源系统或目标系统是否有新的数据可用。支持配置轮训频率,间隔可以控制到毫秒级,提高数据同步的实时性;(3) Regular polling: The connector will poll regularly to check whether new data is available in the source or target system. The polling frequency can be configured, and the interval can be controlled to milliseconds to improve the real-time performance of data synchronization;
(4)数据获取:当连接器发现有新的数据可用时,它会获取数据并进行相应的处理(比如进行转换、筛选等),然后将数据写入Kafka集群的指定主题;(4) Data acquisition: When the connector finds that new data is available, it acquires the data and processes it accordingly (such as conversion, filtering, etc.), and then writes the data to the specified topic of the Kafka cluster;
(5)异常处理:在数据传输的过程中,可能会出现各种错误和异常,连接器会针对这些情况进行处理,比如进行重试、错误记录、告警等;(5) Exception handling: During data transmission, various errors and exceptions may occur. The connector will handle these situations, such as retrying, error logging, and alarming.
(6)数据一致性:Kafka Connect会使用事务保证数据写入的原子性,在出现故障时能够正确地恢复数据处理进度,保证数据的一致性;(6) Data consistency: Kafka Connect uses transactions to ensure the atomicity of data writing. In the event of a failure, it can correctly restore the data processing progress and ensure data consistency.
请参照图3,S2、将所述原数据库中的历史数据提取为历史数据快照,通过所述历史数据快照同步所述历史数据至目标数据库后,执行S3;Referring to FIG. 3 , S2, extracting the historical data in the original database as a historical data snapshot, synchronizing the historical data to the target database through the historical data snapshot, and then executing S3;
其中,通过所述历史数据快照同步所述历史数据至目标数据库包括:Wherein, synchronizing the historical data to the target database through the historical data snapshot includes:
请参照图4,S21、对所述历史数据快照进行切片,得到切片快照,包括:Referring to FIG. 4 , S21, slicing the historical data snapshot to obtain a slice snapshot includes:
S211、获取历史数据快照中的表快照,提取每一所述表快照的表名称;S211, obtaining a table snapshot in the historical data snapshot, and extracting a table name of each of the table snapshots;
S212、根据所述表名称获取表结构,并判断所述表快照是否包括主键;若是,则执行S213;否则执行S214;S212, obtaining a table structure according to the table name, and determining whether the table snapshot includes a primary key; if so, executing S213; otherwise, executing S214;
S213、根据所述主键以及预设切片步长进行切片得到切片快照;S213, slicing according to the primary key and the preset slicing step to obtain a slicing snapshot;
在一种可选的实施方式中,在S212和S213之间还包括判断主键是否为int类型字段,若是,则执行S213,否则,执行S214或计算逐渐对应的哈希值,根据该哈希值以及预设切片步长进行切片得到切片快照;In an optional implementation, between S212 and S213, it is also included to determine whether the primary key is an int type field, if so, execute S213, otherwise, execute S214 or calculate the gradually corresponding hash value, and slice according to the hash value and the preset slice step to obtain a slice snapshot;
S214、接收切片字段,计算所述表快照中每一所述切片字段对应的哈希值,根据所述哈希值以及预设切片步长进行切片得到切片快照;S214, receiving a slice field, calculating a hash value corresponding to each slice field in the table snapshot, and performing slicing according to the hash value and a preset slicing step to obtain a slice snapshot;
因通过主键进行切片的限制较大,需要配置有主键并且主键是int(整型)字段,故本申请引入计算哈希值的方式,可以由客户自定义或者预设的规则获取表格中一个已有的字段作为切片字段,计算切片字段中每个值对应的哈希值,根据哈希值进行切片,则无主键表或者主键类型不符合要求的表也能够完成切片;又因切片后是通过不同的任务管理器执行同步任务,故只要保证各个任务管理器之间的切片处理量大致均衡即可,故即使是没有严格去重的非主键也可通过后续任务管理器的分配规则限定确保各个任务管理器的处理任务统一,故非主键可能的重复对后续效果的影响小;Since slicing by primary key has great limitations, it is necessary to configure a primary key and the primary key is an int (integer) field. Therefore, this application introduces a method of calculating hash values. An existing field in the table can be obtained as a slicing field by a custom or preset rule of the customer, and the hash value corresponding to each value in the slicing field is calculated. Slicing is performed according to the hash value, so that tables without primary keys or tables whose primary key types do not meet the requirements can also complete slicing; and because the synchronization tasks are performed by different task managers after slicing, it is sufficient to ensure that the slicing processing volume between each task manager is roughly balanced. Therefore, even non-primary keys without strict deduplication can be limited by the allocation rules of subsequent task managers to ensure that the processing tasks of each task manager are unified, so the possible duplication of non-primary keys has little impact on subsequent effects;
其中,在数据中最大的整型值是2^64,故可以通过chunk.key-column指定字段为chunk分片字段,通过hash算法,将指定的chunk键计算出一个新的hash值(64位整数),再将转换后的hash值进行切片;The largest integer value in the data is 2^64, so you can specify the field as the chunk sharding field through chunk.key-column, calculate a new hash value (64-bit integer) for the specified chunk key through the hash algorithm, and then slice the converted hash value;
S22、将每一所述切片快照分别发送到不同的任务管理器(taskManager),由不同的所述任务管理器同时将所述切片快照同步至目标数据库;S22, sending each of the slice snapshots to different task managers (taskManager) respectively, and having different task managers synchronize the slice snapshots to the target database at the same time;
在一种可选的实施方式中,请参照图4,在任务管理器完成自身的切片快照同步任务后,发送ACK(Acknowledge character,确认字符)给资源管理器(Resource Manager),确认资源管理器收到所有任务管理器的ACK后,执行S3即进入增量实时同步阶段;In an optional implementation, please refer to FIG. 4 , after the task manager completes its own slice snapshot synchronization task, it sends an ACK (Acknowledge character) to the resource manager (Resource Manager), and after confirming that the resource manager has received the ACKs of all task managers, S3 is executed to enter the incremental real-time synchronization stage;
请参照图3,S3、获取原数据库中的增量最新点位信息以及存储的历史点位信息,根据所述历史点位信息以及所述增量最新点位信息之间的差获取增量日志数据,根据所述增量日志数据更新目标数据库,包括:Please refer to FIG. 3 , S3, obtaining the incremental latest point information and the stored historical point information in the original database, obtaining incremental log data according to the difference between the historical point information and the incremental latest point information, and updating the target database according to the incremental log data, including:
S31、记录原数据库中的当前数据操作序列号作为增量最新点位信息;S31, recording the current data operation sequence number in the original database as the latest incremental point information;
若需要获取历史数据,则步骤S31替换为在开始提取历史数据快照时,记录当前数据操作序列号作为历史点位信息;If historical data needs to be obtained, step S31 is replaced by recording the current data operation sequence number as historical point information when starting to extract the historical data snapshot;
在一种可选的实施方式中,通常在传统数据库中,数据库日志都存在对应的数据操作序列号(例如Oracle数据库中为SCN,System Change Number,MySQL、SQLserver以及postgreSQL中为binlog),可以通过CDC(Change Data Capture,数据变更获取)技术获取当前数据操作序列号;In an optional implementation, usually in a traditional database, the database log has a corresponding data operation sequence number (for example, SCN, System Change Number in Oracle database, binlog in MySQL, SQLserver and postgreSQL), and the current data operation sequence number can be obtained through CDC (Change Data Capture) technology;
S32、判断所述历史点位信息与所述增量最新点位信息之间是否存在差值;若是,则执行S33;否则,返回执行S1;S32, determining whether there is a difference between the historical point information and the incremental latest point information; if so, executing S33; otherwise, returning to executing S1;
在一种可选的实施方式中,若不存在差值,可持续执行S31,因若进行根据增量日志数据更新目标数据库的方式,则说明历史数据已经同步完成,若历史数据没有同步完成,根据增量日志数据中存储的操作进行同步的方式会因所操作的数据不同产生误差;故可直接跳过对是否需要进行历史数据同步进行判断的步骤,提高步骤执行的效率;In an optional implementation, if there is no difference, S31 can be continuously executed, because if the target database is updated according to the incremental log data, it means that the historical data has been synchronized. If the historical data is not synchronized, the synchronization method based on the operation stored in the incremental log data will produce errors due to different data operated; therefore, the step of judging whether historical data synchronization is needed can be directly skipped, thereby improving the efficiency of step execution;
S33、将所述历史点位信息增加预设值后得到同步操作序列号,根据所述同步操作序列号获取增量日志数据,根据所述增量日志数据更新目标数据库后,将所述同步操作序列号作为历史点位信息,返回执行S31;S33, after increasing the historical point information by a preset value, obtain a synchronization operation sequence number, obtain incremental log data according to the synchronization operation sequence number, update the target database according to the incremental log data, use the synchronization operation sequence number as the historical point information, and return to execute S31;
其中,根据所述同步操作序列号获取增量日志数据包括:Wherein, obtaining incremental log data according to the synchronization operation sequence number includes:
S331、获取不同日志组的标志位范围;S331, obtaining the flag bit range of different log groups;
S332、获取所述同步操作序列号所述的标志位范围所对应的目标日志组;S332, obtaining the target log group corresponding to the flag bit range described in the synchronization operation sequence number;
S333、在所述日志组中检索所述同步操作序列号对应的日志数据,得到增量日志数据;S333, searching the log data corresponding to the synchronization operation sequence number in the log group to obtain incremental log data;
其中,根据所述增量日志数据更新目标数据库包括:Wherein, updating the target database according to the incremental log data includes:
S334、若所述增量日志数据的操作类型为DDL(data definition language,数据定义语言)操作,则执行S336;S334. If the operation type of the incremental log data is a DDL (data definition language) operation, execute S336;
S335、若所述增量日志数据的操作类型为DML(Data Manipulation Language,数据操纵语言)操作,则解析所述DML操作对应的操作数据,并将所述操作数据转换为适配目标数据库的格式之后执行S336;S335, if the operation type of the incremental log data is a DML (Data Manipulation Language) operation, parsing the operation data corresponding to the DML operation, and converting the operation data into a format adapted to the target database, and then executing S336;
S336、将所述DDL操作/DML操作转换为与所述目标数据库对应的数据库语言格式;S336, converting the DDL operation/DML operation into a database language format corresponding to the target database;
例如,若目标数据库位Mysql,可以转换成insert into TEST(PUID,ALARMTIME)values(’1000000’,’20240224134950’)该SQL语句仅作为示例,开发者可根据自己的目标数据库,转换成自己想要的SQL格式,完成SQL的转换;For example, if the target database is MySQL, it can be converted into insert into TEST(PUID,ALARMTIME)values('1000000','20240224134950'). This SQL statement is only used as an example. Developers can convert it into the desired SQL format according to their own target database to complete the SQL conversion.
S337、根据转换后的所述DML操作以及操作数据/DDL操作更新目标数据库;S337, updating the target database according to the converted DML operation and operation data/DDL operation;
并且,在更新目标数据库的过程中,由S336能够获取到写入目标数据库的代码执行块;但是有些情况下,特别是针对数据量大的情况下,原数据库若为传统数据库,会对传统数据库按照时间字段进行一些分库分表的操作,以提升部分业务查询的性能;但是若目标数据库采用MPP数据库,分库分表通常会被摒弃,即在MPP数据库中通常会采用一张大表,然后通过数据分区的方式将大表数据划分,提高查询性能。则此时在进行数据同步的过程中,需要将原数据库中的多表写入到目标数据库中同一张表的不同分区。如步骤S336示例当中,数据属于TEST202402这张分表,将会写入到MPP的数据库TEST,分区partition=202402当中。所有的传统MySQL数据库的分表(分表的表结构相同),在MPP数据库当中仅作为单个表的不同分区。本实施例中不仅提供单表->单表的原格式同步,也提供多表->单表的同步。以上述表为例,通过配置多表->单表的映射关系,实现多表的分区合并;本实施例中提供配置API,参照下表1,为一种API的结构示例:Moreover, in the process of updating the target database, S336 can obtain the code execution block written to the target database; however, in some cases, especially for large amounts of data, if the original database is a traditional database, some sub-library and sub-table operations will be performed on the traditional database according to the time field to improve the performance of some business queries; however, if the target database adopts an MPP database, sub-library and sub-table operations will usually be abandoned, that is, a large table is usually used in the MPP database, and then the large table data is divided by data partitioning to improve query performance. At this time, in the process of data synchronization, multiple tables in the original database need to be written to different partitions of the same table in the target database. For example, in the example of step S336, the data belongs to the sub-table TEST202402, which will be written to the MPP database TEST, partition partition = 202402. All sub-tables of traditional MySQL databases (the table structure of the sub-tables is the same) are only different partitions of a single table in the MPP database. In this embodiment, not only the original format synchronization of single table -> single table is provided, but also the synchronization of multiple tables -> single table is provided. Taking the above table as an example, by configuring the mapping relationship of multiple tables -> single table, the partition merging of multiple tables is realized; in this embodiment, a configuration API is provided, and referring to the following Table 1, it is an example of the structure of an API:
表1Table 1
则一种代码实现示例为:Then a code implementation example is:
{"sourceTableName":"TEST*",{"sourceTableName":"TEST*",
"sinkTableName":"TEST","sinkTableName":"TEST",
"partitionKey":"ALARMTIME","partitionKey":"ALARMTIME",
"interval":"month"},含义为将原数据库所有TEST开头的表,按照ALARMTIME字段值,按月分区写入MPP数据库的TEST表;原数据库TEST*表数据格式:{"PUID":"1000000","ALARMTIME":"20240224134950"};"interval":"month"}, which means that all tables starting with TEST in the original database are written into the TEST table of the MPP database by monthly partition according to the ALARMTIME field value; the data format of the TEST* table in the original database is: {"PUID":"1000000","ALARMTIME":"20240224134950"};
同时,请参照图6,引入两阶段确认方式(2PC)写入,确保数据不会丢失,保证事务的一致性,并且引入Flink中的checkpoint+savepoint机制,所有任务在checkpoint/savepoint的阶段里,都会将自身的执行中间状态值(State)写入到后端存储RocksDb,不仅避免了中间状态值太大而导致内存溢出问题,而且还能通过state值进行任务恢复。参照图7,通过checkpoint机制,在不需要人工干预的情况下,任务可以通过获取最新的state状态值,完成任务的自恢复,实现数据的断点续传。通常情况下,用户在修改代码逻辑后,任务需要重新启动,此发明通过savepoint机制,可以让用户修改同步逻辑之后,比如加表减表,修改映射关系,修改转换逻辑以及计算逻辑等,仍然可以从指定的位置断点续传;At the same time, please refer to Figure 6, introduce the two-stage confirmation method (2PC) writing to ensure that data will not be lost, ensure the consistency of transactions, and introduce the checkpoint+savepoint mechanism in Flink. All tasks will write their own execution intermediate state values (State) to the backend storage RocksDb in the checkpoint/savepoint stage. This not only avoids the problem of memory overflow caused by the intermediate state value being too large, but also enables task recovery through the state value. Referring to Figure 7, through the checkpoint mechanism, without the need for manual intervention, the task can complete the self-recovery of the task by obtaining the latest state value and realize breakpoint resumption of data. Normally, after the user modifies the code logic, the task needs to be restarted. This invention uses the savepoint mechanism to allow the user to modify the synchronization logic, such as adding or subtracting tables, modifying mapping relationships, modifying conversion logic and calculation logic, etc., and still be able to resume transmission from the specified location;
在一种可选的实施方式中,根据所述增量日志数据更新目标数据库还包括S300、将增量日志数据序列化,解析相应的SQL和数据内容,以确定增量日志数据时DDL数据还是DML数据,并获取DML数据对应的操作数据;表2提供一种对日志文件进行解析的格式,开发者可根据原数据库以及目标数据库的特征以及同步过程中的要求自定义;In an optional implementation, updating the target database according to the incremental log data further includes S300, serializing the incremental log data, parsing the corresponding SQL and data content to determine whether the incremental log data is DDL data or DML data, and obtaining operation data corresponding to the DML data; Table 2 provides a format for parsing a log file, which can be customized by the developer according to the characteristics of the original database and the target database and the requirements during the synchronization process;
参照表2,所有的增删该数据操作,都可以由after和before两个字段表示,例如:Referring to Table 2, all operations of adding or deleting data can be represented by the two fields after and before, for example:
插入:before=null,after=afterValue;Insert: before = null, after = afterValue;
更新:before=beforeValue,after=afterValue;Update: before=beforeValue, after=afterValue;
删除:before=beforeValue.after=null;Delete: before=beforeValue.after=null;
表2Table 2
请参照图5,为一种序列化后的一条日志示例,可以知道是在1708753859000(毫秒级时间戳,转换为北京时间是2024-02-24 13:50:59)时刻,往Oracle数据库的C##TEST.TEST202402表当中插入了一条字段值分别是{"PUID":"1000000","ALARMTIME":"20240224134950"}的数据;Please refer to Figure 5, which is an example of a serialized log. It can be seen that at 1708753859000 (millisecond timestamp, converted to 2024-02-24 13:50:59 Beijing time), a data with field values of {"PUID":"1000000","ALARMTIME":"20240224134950"} was inserted into the C##TEST.TEST202402 table of the Oracle database;
在一种可选的实施方式中,目标数据库为大规模并行处理数据库(MPP,MassivelyParallel Processing);In an optional implementation, the target database is a massively parallel processing database (MPP, Massively Parallel Processing);
在一种可选的实施方式中,根据增量日志数据更新目标数据库包括:通过Flink(一种针对流数据的计算框架)技术自定义与目标数据库适配的修改数据格式以及SQL语句格式,以通过Flink将解析后的DML操作以及DDL操作同步到目标数据库中;In an optional implementation, updating the target database according to the incremental log data includes: customizing a modified data format and an SQL statement format adapted to the target database through Flink (a computing framework for stream data) technology, so as to synchronize the parsed DML operation and DDL operation to the target database through Flink;
以Oracle为例,Oracle数据库当中,会有一个全局的标志符SCN,用于标志数据库当中每个数据块的变化;而SCN是一个单调递增的整数,以秒为单位,可以控制并发事务,记录数据变化以及通过SCN进行数据块的恢复等;因此实时同步程序在启动时,首先要获取到最新的SCN值,表示为currentScn(增量最新点位信息),对于SCN<currentScn的数据,标记为历史数据进行同步,如S2中所描述,当快照同步阶段结束后,我们就从currentScn值开始,获取当前的点位信息,点位信息包括有全局标志号SCN、日志事务序列号LSN、数据文件标志号DAYAFILE NUMBER以及重做序列号RSLN,此处我们用到的是SCN,通过SCN从重做日志中寻找实际的操作日志,解析数据操作;每处理完一条数据操作,将用currentScn+1来更新当前的SCN(currentScn=currentScn+1),并确认是否与当前点位信息相同,若否则继续通过更新后的SCN寻找实际的操作日志(增量日志数据)这样能保证历史的全量同步以及增量同步的无缝衔接,而且能保证获取的数据不会重复;Taking Oracle as an example, there is a global identifier SCN in the Oracle database, which is used to mark the changes of each data block in the database. SCN is a monotonically increasing integer in seconds, which can control concurrent transactions, record data changes, and restore data blocks through SCN. Therefore, when the real-time synchronization program is started, it must first obtain the latest SCN value, which is expressed as currentScn (incremental latest point information). For data with SCN < currentScn, it is marked as historical data for synchronization, as described in S2. When the snapshot synchronization phase is over, we start from the currentScn value to obtain the current point information. The point information includes the global identifier SCN, the log transaction sequence number LSN, and the data file identifier DAYAFILE. NUMBER and redo sequence number RSLN. Here we use SCN. We use SCN to find the actual operation log from the redo log and parse the data operation. After each data operation is processed, the current SCN will be updated with currentScn+1 (currentScn=currentScn+1), and it will be confirmed whether it is the same as the current point information. If not, continue to use the updated SCN to find the actual operation log (incremental log data). This ensures seamless connection between historical full synchronization and incremental synchronization, and ensures that the acquired data will not be repeated.
还包括:在历史数据同步完成之后,在业务系统中将查询数据配置对应的数据库修改为目标数据库,则在进行查询(selectdb)的过程中,会从目标数据库中进行查询,实现读写分离;It also includes: after the historical data synchronization is completed, the database corresponding to the query data configuration is modified to the target database in the business system, and during the query (selectdb) process, the query will be performed from the target database to achieve read-write separation;
以Doris为例,Doris为一种MPP数据库,其支持jdbc协议。对于mysql到doris的切换,分为两种情况:若为业务数据写入则原有的数据库连接信息不变,由CDC通过日志同步数据到doris,用户只需启动1个CDC任务即可;若为业务数据读取则将读取的数据源修改为doris链接信息;通过配置数据写入和数据读取两个数据源连接,可以很方便地将原有的业务迁移到新的MPP数据库查询,完成数据的读写分离。Take Doris as an example. Doris is an MPP database that supports the JDBC protocol. There are two cases for switching from MySQL to Doris: if it is business data writing, the original database connection information remains unchanged, and CDC synchronizes data to Doris through logs. Users only need to start one CDC task; if it is business data reading, the data source to be read is modified to the Doris link information; by configuring two data source connections for data writing and data reading, the original business can be easily migrated to the new MPP database query to complete the data read and write separation.
请参照图8,本发明的实施例二为:Please refer to FIG. 8 , the second embodiment of the present invention is:
一种数据库同步装置,包括日志获取模块(数据接入模块)、SQL解析模块、数据封装模块以及SQL转换模块;A database synchronization device includes a log acquisition module (data access module), an SQL parsing module, a data encapsulation module and an SQL conversion module;
数据接入模块,用于通过监听获取原数据库的操作日志,在同步历史数据时实现切片过程;并将历史数据以及格式转换后的DDL以及DML数据写入目标数据库;The data access module is used to obtain the operation log of the original database through monitoring, implement the slicing process when synchronizing historical data, and write the historical data and the DDL and DML data after format conversion into the target database;
SQL解析模块,用于在同步增量数据的过程中实现解析SCN以及通过SCN获取操作日志;SQL parsing module, used to parse SCN and obtain operation logs through SCN during the synchronization of incremental data;
数据封装模块,用于获取日志中的SQL以及数据内容;Data encapsulation module, used to obtain SQL and data content in the log;
SQL转换模块,用于将获取到的DDL以及DML转变为适配目标数据库的格式;SQL conversion module, used to convert the acquired DDL and DML into a format that is suitable for the target database;
可针对每个模块构建接口供开发者调用,则开发者能够适配自身的原数据库和目标数据库,提高灵活性。An interface can be built for each module for developers to call, so that developers can adapt their own original database and target database to improve flexibility.
请参照图9,本发明的实施例三为:Please refer to FIG. 9 , the third embodiment of the present invention is:
一种数据库同步终端1,包括处理器2、存储器3及存储在存储器3上并可在所述处理器2上运行的计算机程序,所述处理器2执行所述计算机程序时实现实施例一中的各个步骤。A database synchronization terminal 1 includes a processor 2, a memory 3, and a computer program stored in the memory 3 and executable on the processor 2. When the processor 2 executes the computer program, each step in the first embodiment is implemented.
综上所述,本发明提供了一种数据库同步方法及终端,采用对于增量数据和历史数据分别进行同步的方式,对于历史数据通过快照的方式进行同步;即历史数据采用快照+并行处理的方式读取,增量数据采用单线读取,当历史数据同步完成之后(快照结束),可自动切换至增量模式。如果需要获取历史数据,则先对历史数据进行快照,并对快照进行切片处理,通过并行获取切片的数据,提高数据获取效率,对历史数据进行快照,避免同步过程中对数据库操作的影响。并且在进行快照时可选是数据库级别还是表级别的快照。由于整库同步中,可能存在无需同步的表,对整库快照会增加一些无用数据的解析,增加传输与计算延时,而选择表级别的快照则能够根据同步的需要更加灵活选择需同步的数据,即只选择需要同步的表进行快照处理。并且在同步过程中还引入了双确认机制等进一步确保数据同步后的完整性。In summary, the present invention provides a database synchronization method and terminal, which adopts a method of synchronizing incremental data and historical data separately, and synchronizes historical data by snapshot; that is, historical data is read by snapshot + parallel processing, and incremental data is read by single line. When the historical data synchronization is completed (snapshot ends), it can automatically switch to incremental mode. If it is necessary to obtain historical data, first take a snapshot of the historical data, and slice the snapshot. By obtaining the sliced data in parallel, the data acquisition efficiency is improved, and the historical data is snapshotted to avoid the impact on the database operation during the synchronization process. And when taking a snapshot, it can be selected whether it is a database level or a table level snapshot. Since there may be tables that do not need to be synchronized in the synchronization of the entire database, the analysis of some useless data will be added to the snapshot of the entire database, and the transmission and calculation delays will be increased. Selecting a table-level snapshot can more flexibly select the data to be synchronized according to the needs of synchronization, that is, only select the tables that need to be synchronized for snapshot processing. In addition, a double confirmation mechanism is introduced in the synchronization process to further ensure the integrity of the data after synchronization.
读写分离降低原数据库的访问与计算压力的同时,能够提高原数据库和目标数据库所构成的查询系统的查询性能和并发量,从而提高系统整体的稳定性以及可用性。Read-write separation reduces the access and computing pressure of the original database while improving the query performance and concurrency of the query system composed of the original database and the target database, thereby improving the overall stability and availability of the system.
以上所述仅为本发明的实施例,并非因此限制本发明的专利范围,凡是利用本发明说明书及附图内容所作的等同变换,或直接或间接运用在相关的技术领域,均同理包括在本发明的专利保护范围内。The above descriptions are merely embodiments of the present invention and are not intended to limit the patent scope of the present invention. Any equivalent transformations made using the contents of the present invention's specification and drawings, or directly or indirectly applied in related technical fields, are also included in the patent protection scope of the present invention.
Claims (10)
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202410774038.8A CN118733673A (en) | 2024-06-17 | 2024-06-17 | A database synchronization method and terminal |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202410774038.8A CN118733673A (en) | 2024-06-17 | 2024-06-17 | A database synchronization method and terminal |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| CN118733673A true CN118733673A (en) | 2024-10-01 |
Family
ID=92866425
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202410774038.8A Pending CN118733673A (en) | 2024-06-17 | 2024-06-17 | A database synchronization method and terminal |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN118733673A (en) |
Cited By (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN119883619A (en) * | 2024-12-27 | 2025-04-25 | 上海思朗科技有限公司 | Data processing method, device, equipment, medium and product |
| CN120873088A (en) * | 2025-09-24 | 2025-10-31 | 杭州协能科技股份有限公司 | Database synchronization method, device, equipment and storage medium |
-
2024
- 2024-06-17 CN CN202410774038.8A patent/CN118733673A/en active Pending
Cited By (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN119883619A (en) * | 2024-12-27 | 2025-04-25 | 上海思朗科技有限公司 | Data processing method, device, equipment, medium and product |
| CN119883619B (en) * | 2024-12-27 | 2025-11-07 | 上海思朗科技股份有限公司 | Data processing method, device, equipment, medium and product |
| CN120873088A (en) * | 2025-09-24 | 2025-10-31 | 杭州协能科技股份有限公司 | Database synchronization method, device, equipment and storage medium |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| EP3602341B1 (en) | Data replication system | |
| US11829360B2 (en) | Database workload capture and replay | |
| US20230267131A1 (en) | Managing database failover based on transaction request time | |
| US11068501B2 (en) | Single phase transaction commits for distributed database transactions | |
| CN113535656B (en) | Data access method, device, equipment and storage medium | |
| US9589041B2 (en) | Client and server integration for replicating data | |
| CN108920698A (en) | A kind of method of data synchronization, device, system, medium and electronic equipment | |
| CN118733673A (en) | A database synchronization method and terminal | |
| CN114691704A (en) | A metadata synchronization method based on MySQL binlog | |
| US12277140B2 (en) | Consensus protocol for asynchronous database transaction replication with fast, automatic failover, zero data loss, strong consistency, full SQL support and horizontal scalability | |
| CN111352766A (en) | A method and device for implementing active-active database | |
| US20250200070A1 (en) | Consensus Protocol For Asynchronous Database Transaction Replication With Fast, Automatic Failover, Zero Data Loss, Strong Consistency, Full SQL Support And Horizontal Scalability | |
| JP2024026143A (en) | Switching to eventually consistent database replication | |
| US11789971B1 (en) | Adding replicas to a multi-leader replica group for a data set | |
| CN119597773A (en) | Metadata acquisition system based on CDC (content data center) file | |
| CN116881371B (en) | Data synchronization method, device, equipment and storage medium | |
| CN112800060A (en) | Data processing method and device, computer readable storage medium and electronic equipment | |
| CN118861155A (en) | A method and system for implementing non-intrusive data synchronization service | |
| Donselaar | Low latency asynchronous database synchronization and data transformation using the replication log. | |
| CN117453460A (en) | Method and device for realizing redis second-level disaster recovery | |
| CN115391463A (en) | Data synchronization method, device and server cluster | |
| CN121188069A (en) | Heterogeneous data query method, device, system and storage medium | |
| CN118861028A (en) | Data database synchronization method, device, electronic device and storage medium | |
| CN119938729A (en) | Data extraction method, device, storage medium, equipment and program product | |
| CN118093688A (en) | A method and system for automatically extracting medical data |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination |