HK40030421B - Managing a computing cluster using time interval counters - Google Patents
Managing a computing cluster using time interval counters Download PDFInfo
- Publication number
- HK40030421B HK40030421B HK62020020494.4A HK62020020494A HK40030421B HK 40030421 B HK40030421 B HK 40030421B HK 62020020494 A HK62020020494 A HK 62020020494A HK 40030421 B HK40030421 B HK 40030421B
- Authority
- HK
- Hong Kong
- Prior art keywords
- request
- value
- time interval
- counter
- time
- Prior art date
Links
Description
相关申请的交叉引用Cross-reference to related applications
本申请要求于2017年10月31日提交的美国申请序列号62/579,225的优先权,该美国申请通过引用并入本文。This application claims priority to U.S. Application Serial No. 62/579,225, filed October 31, 2017, which is incorporated herein by reference.
技术领域Technical Field
本说明书涉及管理计算集群。This manual pertains to the management of computing clusters.
背景技术Background Technology
数据流计算的一种方法利用了基于图的表示,其中,与图的节点(顶点)相对应的计算组件通过与该图(称为“数据流图”)的链路(有向边)相对应的数据流耦接。通过数据流链路连接到上游组件的下游组件接收有序的输入数据元素流,并按接收到的顺序处理输入数据元素,以可选地生成一个或多个相应的输出数据元素流。在名称为“EXECUTINGCOMPUTATIONS EXPRESSED AS GRAPHS(执行表示为图的计算)”的在先美国专利5,966,072中描述了一种用于执行此类基于图的计算的系统,该专利通过引用并入本文。在与该在先专利中描述的方法有关的实施方式中,每个组件被实施为托管在多个典型计算机服务器之一上的进程。每个计算机服务器可以在任何一个时间激活多个此类组件进程,并且操作系统(例如,Unix)调度程序在该服务器上托管的组件之间共享资源(例如,处理器时间和/或处理器核)。在这种实施方式中,可以使用操作系统和连接服务器的数据网络的数据通信服务(例如,命名管道、TCP/IP会话等)来实施组件之间的数据流。组件的子集通常用作整个计算(例如,去往和/或来自数据文件、数据库表和外部数据流)的数据源和/或数据接收器。在例如通过协作进程建立了组件进程和数据流之后,数据便流经整个计算系统,该计算系统实施表示为图的计算,该计算通常受每个组件处输入数据的可用性和每个组件的计算资源的调度的支配。因此,至少可以通过使不同的组件能够由不同的进程(托管在相同或不同的服务器计算机或处理器核上)并行执行来实现并行性,其中,在本文中通过数据流图在不同路径上并行执行不同组件称为组件并行性,并且在本文中通过数据流图在同一路径的不同部分上并行执行不同组件称为流水线并行性。One method of dataflow computation utilizes a graph-based representation, in which computational components corresponding to nodes (vertices) of a graph are coupled via data flows corresponding to links (directed edges) of the graph (referred to as a “dataflow graph”). Downstream components connected to upstream components via data flow links receive ordered streams of input data elements and process them in the received order to optionally generate one or more corresponding streams of output data elements. A system for performing such graph-based computation is described in prior U.S. Patent 5,966,072, entitled “EXECUTING COMPUTATIONS EXPRESSED AS GRAPHS,” which is incorporated herein by reference. In embodiments relating to the method described in that prior patent, each component is implemented as a process hosted on one of a plurality of typical computer servers. Each computer server may have multiple such component processes active at any given time, and the operating system (e.g., Unix) scheduler shares resources (e.g., processor time and/or processor cores) among the components hosted on that server. In this implementation, data flow between components can be implemented using data communication services (e.g., named pipes, TCP/IP sessions, etc.) of the operating system and the data network connecting the server. A subset of components typically serves as a data source and/or data receiver for the entire computation (e.g., to and/or from data files, database tables, and external data streams). After component processes and data flows are established, for example, through a collaborative process, data flows through the entire computing system, which performs computations represented as graphs, typically governed by the availability of input data at each component and the scheduling of computational resources for each component. Therefore, parallelism can be achieved at least by enabling different components to be executed in parallel by different processes (hosted on the same or different server computers or processor cores). Herein, parallel execution of different components on different paths via a data flow graph is referred to as component parallelism, and parallel execution of different components on different parts of the same path via a data flow graph is referred to as pipeline parallelism.
这种方法还支持其他形式的并行性。例如,可以例如根据数据集的记录中字段的值的分区来划分输入数据集,其中,每个部分被发送到组件中处理该数据集的记录的单独副本。组件的此类单独副本(或“实例”)可以在单独服务器计算机或服务器计算机的单独处理器核上执行,从而实现本文所称的数据并行性。单独组件的结果可以合并以再次形成单个数据流或数据集。用于执行组件的实例的计算机或处理器核的数量将在开发数据流图时由开发者指定。This approach also supports other forms of parallelism. For example, the input dataset can be partitioned, for instance, based on the values of fields in the records of the dataset, with each part being sent to a component to process a separate copy of the records in that dataset. Such separate copies (or "instances") of the components can execute on a single server computer or a single processor core of a server computer, thus achieving what is referred to herein as data parallelism. The results of the individual components can be merged to form a single data stream or dataset again. The number of computers or processor cores used to execute the instances of the components will be specified by the developer when developing the data flow graph.
可以使用各种方法来提高这种方法的效率。例如,组件的每个实例不一定必须托管在其自己的操作系统进程中,例如,使用一个操作系统进程来实施多个组件(例如,形成较大图的连接子图的组件)。Various methods can be used to improve the efficiency of this approach. For example, each instance of a component does not necessarily have to be hosted in its own operating system process; for instance, multiple components can be implemented using a single operating system process (e.g., components that form connected subgraphs of a larger graph).
上述方法的至少一些实施方式受到与在基础计算机服务器上执行所产生进程的效率有关的限制。例如,这些限制可能与重新配置图的运行实例以更改数据并行性程度、更改托管各种组件的服务器和/或平衡不同计算资源上的负荷的困难有关。现有的基于图的计算系统还具有启动时间慢的问题,这通常是因为不必要地启动了太多的进程而浪费了大量的内存。一般而言,进程从图执行的启动开始,并且到图执行完成时结束。At least some implementations of the above methods are limited by the efficiency of the processes generated during execution on the underlying computer server. For example, these limitations may be related to the difficulty of reconfiguring running instances of the graph to change the degree of data parallelism, changing the servers hosting various components, and/or balancing the load across different computing resources. Existing graph-based computing systems also suffer from slow startup times, often due to the unnecessary startup of too many processes, wasting significant amounts of memory. Generally, processes begin at the start of graph execution and end when graph execution is complete.
已使用了用于分布计算的其他系统,其中将整个计算分为较小的部分,并且将这些部分从一个主计算机服务器分布到各个其他(例如,“从”)计算机服务器,这些计算机服务器各自独立地执行计算并且将其结果返回到主服务器。此类方法中的一些称为“网格计算”。然而,此类方法通常依赖于每个计算的独立性,除了经由主计算机服务器调用计算部分之外,不提供用于在那些部分之间传递数据、或者对这些部分的执行进行调度和/或排序的机制。因此,此类方法不能为托管涉及多个组件之间的交互的计算提供直接且高效的解决方案。Other systems for distributed computing have been used, in which the entire computation is divided into smaller parts and these parts are distributed from a master computer server to various other (e.g., "slave") computer servers, each of which performs the computation independently and returns its results to the master server. Some of these approaches are called "grid computing." However, such approaches typically rely on the independence of each computation and do not provide mechanisms for transferring data between those parts, or for scheduling and/or ordering the execution of these parts, except for invoking the computation parts via the master computer server. Therefore, such approaches do not provide a direct and efficient solution for hosting computations involving interactions between multiple components.
对大型数据集进行分布式计算的另一种方法利用MapReduce框架(例如,如Apache系统中所实施的)。一般而言,Hadoop具有分布式文件系统,每个命名文件的部分都分布于其中。用户根据两个函数指定计算:在命名输入的所有部分上以分布式方式执行的映射函数和在映射函数执行的输出部分上执行的归约函数。映射函数执行的输出被划分并再次存储在分布式文件系统中的中间部分中。然后以分布式方式执行归约函数以处理中间部分,从而得出整个计算的结果。尽管可以高效地执行可以在MapReduce框架中表示的、并且其输入和输出可修改以存储在MapReduce框架的文件系统内的计算,但是许多计算与此框架不匹配和/或不容易适配用于使其所有输入和输出都包含在分布式文件系统内。Another approach to distributed computation on large datasets utilizes the MapReduce framework (e.g., as implemented in Apache systems). Generally, Hadoop has a distributed file system where portions of each named file are distributed. Users specify computations based on two functions: a mapping function executed distributedly on all portions of the named input and a reduction function executed on the output portion of the mapping function. The output of the mapping function is partitioned and stored again in intermediate portions within the distributed file system. The reduction function is then executed distributedly to process these intermediate portions, yielding the result of the entire computation. While computations that can be represented within the MapReduce framework, and whose inputs and outputs can be modified to be stored within the MapReduce framework's file system, can be performed efficiently, many computations are incompatible with this framework and/or not easily adapted to have all their inputs and outputs contained within a distributed file system.
发明内容Summary of the Invention
在总体方面,一种用于在具有多个处理节点的分布式数据处理系统中处理状态更新请求的方法包括使用这些处理节点中的两个或更多个处理节点来处理请求集,每个请求集中的每个请求被配置用于在这些处理节点之一处引起状态更新,并与多个时间间隔中的相应时间间隔相关联。状态更新请求集包括与第一时间间隔相关联的第一请求集。In general, a method for processing state update requests in a distributed data processing system with multiple processing nodes includes using two or more of these processing nodes to process a set of requests, each request in each set being configured to cause a state update at one of these processing nodes and associated with a corresponding time interval among a plurality of time intervals. The set of state update requests includes a first set of requests associated with a first time interval.
该方法还包括在这些处理节点之一处维护多个计数器,该多个计数器包括:工作计数器,该工作计数器在该分布式数据处理系统中指示该多个时间间隔中的当前时间间隔以及该当前时间间隔的值;复制计数器,该复制计数器指示该多个时间间隔中的某个时间间隔以及该时间间隔的值,在该时间间隔内,在该多个处理节点中的数个处理节点处复制与该时间间隔相关联的所有请求;以及持久性计数器,该持久性计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,与该时间间隔相关联的所有请求均被存储在与该多个处理节点中的至少一个处理节点相关联的持久性存储装置中。The method further includes maintaining a plurality of counters at one of these processing nodes, the plurality of counters including: a working counter that indicates the current time interval and the value of the current time interval in the distributed data processing system; a replication counter that indicates a time interval and the value of the time interval in which all requests associated with the time interval are replicated at several processing nodes among the plurality of processing nodes; and a persistence counter that indicates a time interval in which all requests associated with the time interval are stored in persistent storage associated with at least one of the plurality of processing nodes.
该方法包括在第一时间从该第一处理节点向其他处理节点提供第一消息,该第一消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第一消息中的复制计数器指示:在该第一时间间隔之前,在两个或更多个处理节点处复制该多个状态更新请求集中的与第二时间间隔相关联的第二请求集中的所有请求。在两个或更多个处理节点处复制与该第二时间间隔之前的时间间隔相关联的任何非持久地存储的请求。尚未在这些处理节点中的两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的至少一些请求。The method includes providing a first message from the first processing node to other processing nodes at a first time. The first message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The replication counter in the first message indicates that, prior to the first time interval, all requests in the second request set associated with the second time interval from the plurality of state update request sets are replicated at two or more processing nodes. Any non-persistently stored requests associated with the time interval preceding the second time interval are replicated at two or more processing nodes. At least some requests in the first request set associated with the first time interval have not yet been replicated at two or more of these processing nodes.
该方法包括在该第一时间之后的第二时间从该第一处理节点向其他处理节点提供第二消息。该第二消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第二消息中的复制计数器的值指示:在两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的所有请求,并且在两个或更多个处理节点处复制与该第一时间间隔之前的时间间隔相关联的任何非持久地存储的请求。该第二消息使该多个处理节点中的至少一个处理节点完成持久地存储该第一请求集中的一个或多个请求。The method includes providing a second message from the first processing node to other processing nodes at a second time following the first time interval. The second message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The replication counter value in the second message indicates that all requests in the first request set associated with the first time interval are replicated at two or more processing nodes, and any non-persistently stored requests associated with time intervals preceding the first time interval are replicated at two or more processing nodes. The second message causes at least one of the plurality of processing nodes to persistently store one or more requests in the first request set.
各方面可以包括以下特征中的一项或多项。Each aspect may include one or more of the following characteristics.
该工作计数器可以自动地增加该当前时间间隔,并且该复制计数器可以响应于在该第一处理节点处从该多个处理节点中的其他处理节点接收到的消息而增加。对于每个时间间隔,其他处理节点中的每个处理节点都可以维护在该处理节点处接收到的第一状态更新请求计数和从该处理节点发送的第二状态更新请求计数。该方法可以包括:在该第一处理节点处,从其他处理节点中的每个处理节点接收该第一时间间隔内的第一状态更新请求计数和该第一时间间隔内的第二状态更新请求计数,对接收到的该第一时间间隔内的第一状态更新请求计数和该第一时间间隔内的第二状态更新请求计数进行汇总,并基于该汇总来确定是否增加该复制计数器的值。The working counter can automatically increment the current time interval, and the replication counter can increment in response to messages received at the first processing node from other processing nodes among the plurality of processing nodes. For each time interval, each of the other processing nodes can maintain a first state update request count received at the processing node and a second state update request count sent from the processing node. The method may include: at the first processing node, receiving the first state update request count and the second state update request count for the first time interval from each of the other processing nodes; summarizing the received first state update request count and the second state update request count for the first time interval; and determining, based on the summarization, whether to increment the value of the replication counter.
该方法可以包括将该复制计数器的值从该第一消息中的该复制计数器的值增加到该第二消息中的该复制计数器的值。该方法可以包括:对接收到的该第一时间间隔内的第一状态更新请求计数和该第一时间间隔内的第二状态更新请求计数进行汇总包括对接收到的该第一时间间隔内的第一状态更新请求计数之和以及该第一时间间隔内的第二状态更新请求计数之和求差值。该方法可以包括:如果接收到的该第一时间间隔内的第一状态更新请求计数之和与该第一时间间隔内的第二状态更新请求计数之和的差为零,则将该复制计数器的值从该第一消息中的该复制计数器的值增加到该第二消息中的该复制计数器的值。The method may include incrementing the value of the replication counter from the value of the replication counter in the first message to the value of the replication counter in the second message. The method may include summing the received first state update request counts and second state update request counts within the first time interval, including calculating the difference between the sum of the received first state update request counts within the first time interval and the sum of the received second state update request counts within the first time interval. The method may include incrementing the value of the replication counter from the value of the replication counter in the first message to the value of the replication counter in the second message if the difference between the sum of the received first state update request counts within the first time interval and the sum of the received second state update request counts within the first time interval is zero.
其他处理节点中的每个处理节点都可以维护该多个时间间隔中的最晚时间间隔的指示符,其中,已使由该处理节点接收到的并与该最晚时间间隔相关联的所有状态更新请求在该处理节点处持久化。该方法可以包括:在该第一处理节点处,从其他处理节点中的每个处理节点接收该最晚时间间隔的指示符,并基于该最晚时间间隔的指示符确定是否增加该持久性计数器。Each of the other processing nodes can maintain an indicator of the latest time interval among the plurality of time intervals, wherein all state update requests received by that processing node and associated with the latest time interval have been persisted at that processing node. The method may include: at the first processing node, receiving the indicator of the latest time interval from each of the other processing nodes, and determining, based on the indicator of the latest time interval, whether to increment the persistence counter.
该方法可以包括将该持久性计数器增加到与该最晚时间间隔的指示符相关联的最早时间间隔。该状态更新请求可以包括数据处理任务、数据处理任务结果和数据记录中的一项或多项。The method may include incrementing the persistence counter to the earliest time interval associated with the indicator of the latest time interval. The status update request may include one or more of the following: a data processing task, the result of the data processing task, and a data record.
在另一总体方面,一种用于在包括多个处理节点的分布式数据处理系统中处理状态更新请求的软件以非暂态形式存储在计算机可读介质上。该软件包括用于使计算系统使用两个或更多个处理节点来处理多个请求集的指令。每个请求集中的每个请求被配置用于在该多个处理节点中的某个处理节点处引起状态更新,并与多个时间间隔中的相应时间间隔相关联。这些请求集包括与该多个时间间隔中的第一时间间隔相关联的第一请求集。In another general aspect, software for processing state update requests in a distributed data processing system comprising multiple processing nodes is stored in a non-transitory form on a computer-readable medium. The software includes instructions for causing the computing system to use two or more processing nodes to process multiple sets of requests. Each request in each set of requests is configured to cause a state update at one of the multiple processing nodes and is associated with a corresponding time interval among a plurality of time intervals. These sets of requests include a first set of requests associated with a first time interval among the plurality of time intervals.
该软件还包括用于使计算系统在第一处理节点处维护多个计数器的指令。这些计数器包括:工作计数器,该工作计数器在该分布式数据处理系统中指示该多个时间间隔中的当前时间间隔;复制计数器,该复制计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,在该多个处理节点中的数个处理节点处复制与该时间间隔相关联的所有请求;以及持久性计数器,该持久性计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,与该时间间隔相关联的所有请求均被存储在与该多个处理节点中的至少一个处理节点相关联的持久性存储装置中。The software also includes instructions for enabling the computing system to maintain multiple counters at a first processing node. These counters include: a working counter that indicates the current time interval among the multiple time intervals in the distributed data processing system; a replication counter that indicates a time interval among the multiple time intervals in which all requests associated with the time interval are replicated at several processing nodes among the multiple processing nodes; and a persistence counter that indicates a time interval among the multiple time intervals in which all requests associated with the time interval are stored in persistent storage associated with at least one of the multiple processing nodes.
该软件还包括用于使计算系统在第一时间从该第一处理节点向该多个处理节点中的其他处理节点提供第一消息的指令。该第一消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第一消息中的复制计数器指示:在该第一时间间隔之前,在两个或更多个处理节点处复制与第二时间间隔相关联的第二请求集中的所有请求,并且在两个或更多个处理节点处复制与该多个时间间隔中的在该第二时间间隔之前的时间间隔相关联的任何非持久地存储的请求。尚未在该多个处理节点中的两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的至少一些请求。The software also includes instructions for enabling the computing system to provide a first message from the first processing node to the other processing nodes among the plurality of processing nodes at a first time. The first message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The replication counter in the first message indicates that, prior to the first time interval, all requests in the second request set associated with the second time interval are replicated at two or more processing nodes, and any non-persistently stored requests associated with the time interval preceding the second time interval are replicated at two or more processing nodes. At least some of the requests in the first request set associated with the first time interval have not yet been replicated at two or more processing nodes among the plurality of processing nodes.
该软件还包括用于使该计算系统在该第一时间之后的第二时间从该第一处理节点向该多个处理节点中的其他处理节点提供第二消息的指令。该第二消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第二消息中的复制计数器的值指示:在两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的所有请求,并且在两个或更多个处理节点处复制与该第一时间间隔之前的时间间隔相关联的任何非持久地存储的请求。该第二消息使该多个处理节点中的至少一个处理节点完成持久地存储该第一请求集中的一个或多个请求。The software also includes instructions for causing the computing system to provide a second message from the first processing node to other processing nodes among the plurality of processing nodes at a second time following the first time interval. The second message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The value of the replication counter in the second message indicates that all requests in the first request set associated with the first time interval are replicated at two or more processing nodes, and any non-persistently stored requests associated with time intervals preceding the first time interval are replicated at two or more processing nodes. The second message causes at least one of the plurality of processing nodes to persistently store one or more requests in the first request set.
在另一总体方面,一种用于在包括多个处理节点的分布式数据处理系统中处理状态更新请求的装置包括包含多个处理节点的分布式数据处理系统。该装置还包括位于该多个处理节点中的两个或更多个处理节点处的用于处理多个请求集的一个或多个处理器。每个请求集中的每个请求被配置用于在该多个处理节点中的某个处理节点处引起状态更新,并与多个时间间隔中的相应时间间隔相关联,该多个请求集包括与该多个时间间隔中的第一时间间隔相关联的第一请求集。In another general aspect, an apparatus for processing state update requests in a distributed data processing system comprising multiple processing nodes includes the distributed data processing system comprising multiple processing nodes. The apparatus further includes one or more processors located at two or more of the multiple processing nodes for processing multiple sets of requests. Each request in each set of requests is configured to cause a state update at one of the multiple processing nodes and is associated with a corresponding time interval among a plurality of time intervals, the plurality of request sets including a first set of requests associated with a first time interval among the plurality of time intervals.
该装置还包括一个或多个数据存储装置,用于在该多个处理节点中的第一处理节点处维护多个计数器。该多个计数器包括:工作计数器,该工作计数器在该分布式数据处理系统中指示该多个时间间隔中的当前时间间隔;复制计数器,该复制计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,在该多个处理节点中的数个处理节点处复制与该时间间隔相关联的所有请求;以及持久性计数器,该持久性计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,与该时间间隔相关联的所有请求均被存储在与该多个处理节点中的至少一个处理节点相关联的持久性存储装置中。The apparatus further includes one or more data storage devices for maintaining multiple counters at a first processing node among the plurality of processing nodes. The multiple counters include: a working counter indicating the current time interval among the plurality of time intervals in the distributed data processing system; a replication counter indicating a time interval in which all requests associated with the time interval are replicated at several processing nodes among the plurality of processing nodes; and a persistence counter indicating a time interval in which all requests associated with the time interval are stored in a persistent storage device associated with at least one of the plurality of processing nodes.
该装置还包括第一输出,该第一输出用于在第一时间从该第一处理节点向该多个处理节点中的其他处理节点提供第一消息。该第一消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第一消息中的复制计数器指示:在该第一时间间隔之前,在两个或更多个处理节点处复制与第二时间间隔相关联的第二请求集中的所有请求,并且在两个或更多个处理节点处复制与该多个时间间隔中的在该第二时间间隔之前的时间间隔相关联的任何非持久地存储的请求。尚未在该多个处理节点中的两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的至少一些请求。The apparatus also includes a first output for providing a first message from the first processing node to the other processing nodes of the plurality of processing nodes at a first time. The first message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The replication counter in the first message indicates that, prior to the first time interval, all requests in the second request set associated with the second time interval are replicated at two or more processing nodes, and any non-persistently stored requests associated with the time interval preceding the second time interval are replicated at two or more processing nodes. At least some of the requests in the first request set associated with the first time interval have not yet been replicated at two or more processing nodes of the plurality of processing nodes.
该装置还包括第二输出,该第二输出用于在该第一时间之后的第二时间从该第一处理节点向该多个处理节点中的其他处理节点提供第二消息。该第二消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第二消息中的复制计数器的值指示:在两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的所有请求,并且在两个或更多个处理节点处复制与该第一时间间隔之前的时间间隔相关联的任何非持久地存储的请求。该第二消息使该多个处理节点中的至少一个处理节点完成持久地存储该第一请求集中的一个或多个请求。The apparatus further includes a second output for providing a second message from the first processing node to other processing nodes among the plurality of processing nodes at a second time following the first time interval. The second message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The replication counter value in the second message indicates that all requests in the first request set associated with the first time interval are replicated at two or more processing nodes, and any non-persistently stored requests associated with time intervals preceding the first time interval are replicated at two or more processing nodes. The second message causes at least one of the plurality of processing nodes to persistently store one or more requests in the first request set.
在另一总体方面,一种用于在分布式数据处理系统中处理状态更新请求的计算系统包括多个处理节点。该计算系统包括用于使用该多个处理节点中的两个或更多个处理节点来处理多个请求集的装置。每个请求集中的每个请求被配置用于在该多个处理节点中的某个处理节点处引起状态更新,并与多个时间间隔中的相应时间间隔相关联。该多个请求集包括与该多个时间间隔中的第一时间间隔相关联的第一请求集。In another general aspect, a computing system for processing state update requests in a distributed data processing system includes a plurality of processing nodes. The computing system includes means for using two or more of the plurality of processing nodes to process a plurality of request sets. Each request in each request set is configured to cause a state update at one of the plurality of processing nodes and is associated with a corresponding time interval among a plurality of time intervals. The plurality of request sets includes a first request set associated with a first time interval among the plurality of time intervals.
该计算系统还包括用于在该多个处理节点中的第一处理节点处维护多个计数器的装置。该多个计数器包括:工作计数器,该工作计数器在该分布式数据处理系统中指示该多个时间间隔中的当前时间间隔;复制计数器,该复制计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,在该多个处理节点中的数个处理节点处复制与该时间间隔相关联的所有请求;以及持久性计数器,该持久性计数器指示该多个时间间隔中的某个时间间隔,在该时间间隔内,与该时间间隔相关联的所有请求均被存储在与该多个处理节点中的至少一个处理节点相关联的持久性存储装置中。The computing system also includes means for maintaining a plurality of counters at a first processing node among the plurality of processing nodes. The plurality of counters includes: a working counter indicating the current time interval among the plurality of time intervals in the distributed data processing system; a replication counter indicating a time interval in which all requests associated with the time interval are replicated at several processing nodes among the plurality of processing nodes; and a persistence counter indicating a time interval in which all requests associated with the time interval are stored in persistent storage associated with at least one of the plurality of processing nodes.
该计算系统还包括用于在第一时间从该第一处理节点向该多个处理节点中的其他处理节点提供第一消息的装置。该第一消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第一消息中的复制计数器指示:在该第一时间间隔之前,在两个或更多个处理节点处复制与第二时间间隔相关联的第二请求集中的所有请求,并且在两个或更多个处理节点处复制与该多个时间间隔中的在该第二时间间隔之前的时间间隔相关联的任何非持久地存储的请求。尚未在该多个处理节点中的两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的至少一些请求。The computing system also includes means for providing a first message from the first processing node to the other processing nodes of the plurality of processing nodes at a first time. The first message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The replication counter in the first message indicates that, prior to the first time interval, all requests in the second request set associated with the second time interval are replicated at two or more processing nodes, and any non-persistently stored requests associated with the time interval preceding the second time interval are replicated at two or more processing nodes. At least some of the requests in the first request set associated with the first time interval have not yet been replicated at two or more processing nodes of the plurality of processing nodes.
该计算系统还包括用于在该第一时间之后的第二时间从该第一处理节点向该多个处理节点中的其他处理节点提供第二消息的装置。该第二消息包括该工作计数器的值、该复制计数器的值、以及该持久性计数器的值。该第二消息中的复制计数器的值指示:在两个或更多个处理节点处复制与该第一时间间隔相关联的第一请求集中的所有请求,并且在两个或更多个处理节点处复制与该第一时间间隔之前的时间间隔相关联的任何非持久地存储的请求。该第二消息使该多个处理节点中的至少一个处理节点完成持久地存储该第一请求集中的一个或多个请求。The computing system also includes means for providing a second message from the first processing node to other processing nodes among the plurality of processing nodes at a second time following the first time interval. The second message includes the value of the work counter, the value of the replication counter, and the value of the persistence counter. The value of the replication counter in the second message indicates that all requests in the first request set associated with the first time interval are replicated at two or more processing nodes, and that any non-persistently stored requests associated with the time interval preceding the first time interval are replicated at two or more processing nodes. The second message causes at least one of the plurality of processing nodes to persistently store one or more requests in the first request set.
各方面可以具有以下优点中的一个或多个优点。Each aspect may have one or more of the following advantages.
总体上,与其中组件(或组件的并行执行副本)被托管在不同的服务器上的上述方法相比,本文中描述的一些特征使得能够提高计算(尤其是基础规范是基于图的程序规范的计算)的计算效率(例如,包括多个处理节点的分布式数据处理系统能够增加每单位给定计算资源所处理的记录数量)。例如,调用集群组件被布置在基于图的程序规范中,并用于将该基于图的程序规范与分布式数据处理系统接口连接,使得由该基于图的程序规范中的处理节点以分布式方式执行该基于图的程序规范所需的计算。此外,本文中描述的一些特征提供了适配用于变化的计算资源和计算要求的能力。本文提供了一种计算方法,该计算方法允许适配用于在执行一个或多个基于图的计算期间可用的计算资源的变化,和/或适配用于(例如,由于正在处理的数据的特性导致的)计算负荷的变化或此类计算的不同组件的负荷随时间的变化。例如,各方面能够适配用于处理节点从分布式数据处理系统中添加或移除(或发生故障以及重新在线)。分布式数据处理系统提供适配性的一种方式是管理系统中数据的复制和持久性,包括维护由处理节点发送和接收的消息的计数以及维护所有消息在系统中被复制和/或持久化的时间间隔的指示符。Overall, compared to the methods described above where components (or parallel execution copies of components) are hosted on different servers, some features described herein enable improved computational efficiency (especially for computations based on graph-based program specifications). For example, a distributed data processing system comprising multiple processing nodes can increase the number of records processed per unit of given computational resources. For instance, a cluster component is deployed within a graph-based program specification and used to interface the graph-based program specification with the distributed data processing system, allowing the processing nodes within the graph-based program specification to perform the required computations in a distributed manner. Furthermore, some features described herein provide the ability to adapt to changing computational resources and requirements. This paper provides a computational approach that allows adaptation to changes in available computational resources during the execution of one or more graph-based computations, and/or to changes in computational load (e.g., due to the characteristics of the data being processed) or changes in the load of different components of such computations over time. For example, aspects can adapt to the addition or removal (or failure and re-online) of processing nodes from the distributed data processing system. One way to provide adaptability in a distributed data processing system is to manage the replication and persistence of data within the system, including maintaining a count of messages sent and received by the processing nodes and an indicator of the time interval at which all messages are replicated and/or persisted in the system.
还提供了一种计算方法,该计算方法能够高效地利用具有不同特性的计算资源(例如,使用每个服务器具有不同处理器数量、每个处理器具有不同处理器核数等的服务器)并高效地支持同构以及异构环境两者。本文中描述的一些特征还能够使基于图的计算快速启动。如本文中描述的,提供这种效率和适应性的一方面是提供对处理节点集群的适当管理。A computational approach is also provided that efficiently utilizes computing resources with different characteristics (e.g., using servers with different numbers of processors per server, different numbers of processor cores per processor, etc.) and efficiently supports both homogeneous and heterogeneous environments. Some of the features described in this paper also enable rapid startup of graph-based computation. As described in this paper, one aspect of providing this efficiency and adaptability is the provision of appropriate management of the cluster of processing nodes.
各方面还有利地是容错的,因为分布式数据处理系统能够通过及时回滚处理而从发生的任何处理错误中恢复。该系统预见了多个可能的回滚场景,并实施用于在可能的回滚场景中的每一个中执行回滚的算法。Another advantage is its fault tolerance, as the distributed data processing system can recover from any processing errors that occur through timely rollback. The system anticipates multiple possible rollback scenarios and implements algorithms to perform a rollback in each of these scenarios.
附图说明Attached Figure Description
图1是用于处理数据的系统的框图。Figure 1 is a block diagram of a system used for data processing.
图2是包括计算集群的计算系统的框图。Figure 2 is a block diagram of a computing system that includes a computing cluster.
图3是表示各种重复时间间隔的时间的时钟的示意图。Figure 3 is a schematic diagram of a clock representing various repetition time intervals.
图4是操作过程的状态转换图。Figure 4 is a state transition diagram of the operation process.
图5至图12展示了计算系统的正常操作。Figures 5 to 12 illustrate the normal operation of the computing system.
图13至图15展示了第一回滚过程。Figures 13 to 15 illustrate the first rollback process.
图16至图18展示了第二回滚过程。Figures 16 to 18 illustrate the second rollback process.
图19至图21展示了第三回滚过程。Figures 19 to 21 illustrate the third rollback process.
图22至图25展示了第四回滚过程。Figures 22 to 25 illustrate the fourth rollback process.
图26至图29展示了第五回滚过程。Figures 26 to 29 illustrate the fifth rollback process.
图30至图32展示了第六回滚过程。Figures 30 to 32 illustrate the sixth rollback process.
图33至图35展示了第七回滚过程。Figures 33 to 35 illustrate the seventh rollback process.
图36至图37展示了第八回滚过程。Figures 36 and 37 illustrate the eighth rollback process.
具体实施方式Detailed Implementation
图1示出数据处理系统200的示例,在该数据处理系统中可以使用计算集群管理技术。系统200包括数据源202,该数据源可以包括一个或多个数据源,诸如存储设备或到在线数据流的连接,数据源中的每一个可以以各种格式(例如,数据库表、电子表格文件、平面文本文件或主机使用的本机格式)中的任一种存储或提供数据。执行环境204包括预处理模块206和执行模块212。例如,在合适的操作系统(诸如UNIX操作系统版本)的控制下,执行环境204可以被托管在一个或多个通用计算机上。例如,执行环境204可以包括多节点并行计算环境,该多节点并行计算环境包括使用多个处理单元(例如,中央处理单元CPU)或处理器核的计算机系统的配置,这些处理单元或处理器核或是本地的(例如,多处理器系统,诸如对称多处理(SMP)计算机)、或是本地分布式的(例如,作为集群或大规模并行处理(MPP)系统耦接的多个处理器)、或是远程的或远程分布的(例如,经由局域网(LAN)和/或广域网(WAN)耦接的多个处理器)、或其任何组合。Figure 1 illustrates an example of a data processing system 200, in which computing cluster management technology can be used. System 200 includes a data source 202, which may include one or more data sources, such as storage devices or connections to online data streams. Each data source may store or provide data in any of a variety of formats, such as database tables, spreadsheet files, flat text files, or native formats used by the host. Execution environment 204 includes a preprocessing module 206 and an execution module 212. For example, execution environment 204 may be hosted on one or more general-purpose computers under the control of a suitable operating system, such as a version of the UNIX operating system. For example, execution environment 204 may include a multi-node parallel computing environment, which includes a configuration of a computer system using multiple processing units (e.g., central processing unit CPUs) or processor cores, which may be local (e.g., a multiprocessor system, such as a symmetric multiprocessing (SMP) computer), locally distributed (e.g., multiple processors coupled as a cluster or massively parallel processing (MPP) system), remote or remotely distributed (e.g., multiple processors coupled via a local area network (LAN) and/or a wide area network (WAN), or any combination thereof.
预处理模块206能够执行在由执行模块212执行程序规范(例如,以下描述的基于图的程序规范)之前可能需要的任何配置。预处理模块206可以配置程序规范以从可以实施数据源202的各种类型的系统(包括不同形式的数据库系统)接收数据。数据可以被组织为具有相应字段(也称为“属性”、“行”或“列”)的值(包括可能的空值)的记录。当首先配置计算机程序(诸如数据处理应用程序)以从数据源读取数据时,预处理模块206通常以关于此数据源中的记录的一些初始格式信息开始。计算机程序可以以如本文中描述的数据流图的形式表示。在一些情况下,数据源的记录结构最初可能不是已知的,而是可在分析数据源或数据之后确定。关于记录的初始信息可包括例如表示不同值的位数、记录内的字段的顺序、以及由位表示的值的类型(例如,字符串、有符号/无符号整数)。Preprocessing module 206 is capable of performing any configurations that may be required before execution module 212 executes the program specification (e.g., the graph-based program specification described below). Preprocessing module 206 can configure the program specification to receive data from various types of systems (including different forms of database systems) that can implement data source 202. Data can be organized as records with values (including possible null values) for corresponding fields (also called "attributes," "rows," or "columns"). When first configuring a computer program (such as a data processing application) to read data from a data source, preprocessing module 206 typically begins with some initial formatting information about the records in this data source. The computer program can be represented in the form of a data flow diagram as described herein. In some cases, the record structure of the data source may not be initially known but can be determined after analyzing the data source or data. Initial information about the records may include, for example, the number of bits representing different values, the order of fields within the record, and the type of value represented by bits (e.g., string, signed/unsigned integer).
提供数据源202的存储设备可以在执行环境204本地,例如,存储在连接到托管执行环境204的计算机的存储介质(例如,硬盘驱动器208)上,或者可以远离执行环境204,例如,被托管在通过(例如,由云计算基础设施提供的)远程连接与托管执行环境204的计算机通信的远程系统(例如,主机210)上。The storage device providing the data source 202 may be local to the execution environment 204, for example, stored on a storage medium (e.g., hard disk 208) of a computer connected to the execution environment 204, or it may be remote from the execution environment 204, for example, hosted on a remote system (e.g., host 210) that communicates with the computer hosting the execution environment 204 via a remote connection (e.g., provided by cloud computing infrastructure).
执行模块212执行由预处理模块206配置和/或生成的程序规范以读取输入数据和/或生成输出数据。输出数据214可以存储回数据源202中或存储在执行环境204可访问的数据存储系统216中、或以其他方式使用。数据存储系统216也可由开发环境218访问,在该开发环境中开发者220能够开发用于使用执行模块212处理数据的应用程序。Execution module 212 executes the program specifications configured and/or generated by preprocessing module 206 to read input data and/or generate output data. Output data 214 may be stored back in data source 202 or in a data storage system 216 accessible to execution environment 204, or otherwise used. Data storage system 216 may also be accessed by development environment 218, in which developer 220 can develop applications for using execution module 212 to process data.
换言之,数据处理系统200可以包括:In other words, the data processing system 200 may include:
耦接到数据存储系统216的可选开发环境218,其中,开发环境218被配置用于构建与数据流图相关联的实施基于图的计算的数据处理应用程序,该基于图的计算是对从一个或多个输入数据集流过处理图组件的图到达一个或多个输出数据集的数据执行的,其中,该数据流图由数据存储系统216中的数据结构指定,该数据流图具有由数据结构指定并表示由一个或多个链路连接的图组件的多个节点,这些链路由数据结构指定并表示图组件之间的数据流;An optional development environment 218 is coupled to the data storage system 216, wherein the development environment 218 is configured to build a data processing application that implements graph-based computation associated with a data flow graph. The graph-based computation is performed on data flowing from one or more input datasets through a graph processing graph component to one or more output datasets. The data flow graph is specified by a data structure in the data storage system 216, which has multiple nodes specified by the data structure and representing graph components connected by one or more links, which are specified by the data structure and represent data flows between graph components.
耦接到数据存储系统216并托管在一个或多个计算机上的执行环境204,执行环境204包括预处理模块206,该预处理模块被配置用于读取指定数据流图的已存储数据结构并用于分配和配置诸如进程等的计算资源,以用于执行对由预处理模块206分派给数据流图的图组件的计算;An execution environment 204 is coupled to a data storage system 216 and hosted on one or more computers. The execution environment 204 includes a preprocessing module 206 configured to read stored data structures of a specified data flow graph and to allocate and configure computational resources such as processes for performing computations on graph components of the data flow graph assigned by the preprocessing module 206.
其中,执行环境204包括执行模块212,该执行模块用于调度和控制对所分派计算或进程的执行,以使得能够执行基于图的计算。即,执行模块被配置用于从数据源202读取数据并使用以数据流图的形式表示的可执行计算机程序来处理数据。The execution environment 204 includes an execution module 212, which schedules and controls the execution of assigned computations or processes to enable graph-based computations. Specifically, the execution module is configured to read data from the data source 202 and process the data using an executable computer program represented as a data flow graph.
1计算集群1 Computing Cluster
一般而言,用于使用执行模块212处理数据的一些计算机程序(在本文中也称为“应用程序”)包括应用程序用来访问计算集群的调用集群组件。例如,参考图2,在流水线式数据处理的方法中,调用集群组件110与计算机集群120的组件交互以处理在调用集群组件110处从该调用集群组件是其一部分的应用程序(例如,数据流图或其他形式的基于图的程序规范)中的组件接收的记录103,并且将相应的结果105传输到该调用集群组件是其一部分的应用程序中的一个或多个其他组件。对于每个输入记录103,调用集群组件110向集群120发送请求113(例如,用于执行数据处理任务的请求),并且一段时间后,该调用集群组件从集群120中接收对该请求113的响应115。在接收到响应115之后的一段时间,通常是在已知处理请求的结果在集群120中适当地持久化之后,调用集群组件110发送与响应115相对应的结果105。Generally, some computer programs (also referred to herein as "applications") used to process data using execution module 212 include calling cluster components used by the application to access the computing cluster. For example, referring to FIG2, in a pipelined data processing method, calling cluster component 110 interacts with components of computer cluster 120 to process records 103 received at calling cluster component 110 from components in an application (e.g., a data flow graph or other form of graph-based program specification) to which calling cluster component 110 is a part, and transmits corresponding results 105 to one or more other components in the application to which calling cluster component 110 is a part. For each input record 103, calling cluster component 110 sends a request 113 to cluster 120 (e.g., a request to perform a data processing task), and after a period of time, the calling cluster component receives a response 115 from cluster 120 to the request 113. After a period of time following the receipt of response 115, typically after the result of the processing request is known to be properly persisted in cluster 120, calling cluster component 110 sends the result 105 corresponding to response 115.
图2中未示出调用集群组件110是其一部分的基于图的程序规范。在图2中,仅示出了单个调用集群组件110,但是应当认识到,通常可以存在许多可以与同一集群120交互的调用集群组件,例如,每个调用集群组件参与相同或不同的应用程序(诸如数据流图)。基于图的程序规范可以被实施为例如如美国专利号5,966,072、美国专利号7,167,850或美国专利号7,716,630中描述的数据流图,或如美国公开号2016/0062776中描述的数据处理图。这种基于数据流图的程序规范通常包括与图的节点(顶点)相对应的计算组件,这些计算组件通过与该图(称为“数据流图”)的链路(有向边)相对应的数据流耦接。通过数据流链路连接到上游组件的下游组件接收有序的输入数据元素流,并按接收到的顺序处理输入数据元素,以可选地生成一个或多个相应的输出数据元素流。在一些示例中,每个组件被实施为托管在多个典型计算机服务器之一上的进程。每个计算机服务器可以在任何一个时间激活多个此类组件进程,并且操作系统(例如,Unix)调度程序在该服务器上托管的组件之间共享资源(例如,处理器时间和/或处理器核)。在这种实施方式中,可以使用操作系统和连接服务器的数据网络的数据通信服务(例如,命名管道、TCP/IP会话等)来实施组件之间的数据流。组件的子集通常用作整个计算(例如,去往和/或来自数据文件、数据库表和外部数据流)的数据源和/或数据接收器。在例如通过协作进程建立了组件进程和数据流之后,数据便流经整个计算系统,该计算系统实施表示为图的计算,该计算通常受每个组件处输入数据的可用性和每个组件的计算资源的调度的支配。Figure 2 does not show a graph-based programming specification in which the call cluster component 110 is a part. Only a single call cluster component 110 is shown in Figure 2; however, it should be recognized that there can typically be many call cluster components that can interact with the same cluster 120, for example, each participating in the same or different applications (such as data flow graphs). The graph-based programming specification can be implemented as, for example, a data flow graph as described in U.S. Patent Nos. 5,966,072, 7,167,850, or 7,716,630, or a data processing graph as described in U.S. Publication No. 2016/0062776. Such a data flow graph-based programming specification typically includes computational components corresponding to the nodes (vertices) of the graph, coupled via data flows corresponding to links (directed edges) in the graph (referred to as the “data flow graph”). Downstream components connected to upstream components via data flow links receive ordered streams of input data elements and process the input data elements in the received order to optionally generate one or more corresponding streams of output data elements. In some examples, each component is implemented as a process hosted on one of several typical computer servers. Each computer server can have multiple such component processes active at any given time, and the operating system (e.g., Unix) scheduler shares resources (e.g., processor time and/or processor cores) among the components hosted on that server. In this implementation, data flow between components can be implemented using data communication services (e.g., named pipes, TCP/IP sessions, etc.) of the operating system and the data network connecting the servers. A subset of components typically serves as a data source and/or data receiver for the entire computation (e.g., to and/or from data files, database tables, and external data streams). After component processes and data flow are established, for example, through cooperating processes, data flows through the entire computing system, which performs computations represented as graphs, typically governed by the availability of input data at each component and the scheduling of computational resources for each component.
集群120包括由通信网络130(在图2中展示为“云”并且可以具有各种互连拓扑,诸如启动、共享介质、超立方体等)耦接的多个集群组件140、150a至150c。每个集群组件(或简称为“组件”)在集群中都有特定的角色。在一些实施方式中,每个组件都被托管在不同的计算资源(例如,单独计算机服务器、多核服务器的单独核等)上。应当理解,这些组件表示集群内的角色,并且在一些实施例中,多个角色可以被托管在一个计算资源上,并且单个角色可以分布在多个计算资源上。Cluster 120 includes multiple cluster components 140, 150a to 150c coupled by a communication network 130 (shown as a "cloud" in Figure 2 and may have various interconnection topologies, such as bootstrapping, shared media, hypercubes, etc.). Each cluster component (or simply "component") has a specific role within the cluster. In some implementations, each component is hosted on a different computing resource (e.g., a single computer server, a single core of a multi-core server, etc.). It should be understood that these components represent roles within the cluster, and in some implementations, multiple roles may be hosted on a single computing resource, and a single role may be distributed across multiple computing resources.
在图2中,根组件140(称为“根”)执行以下全面描述的某些同步功能,但是不直接参与要处理的数据的流或计算。多个工作器组件150a至150c(以下称为“工作器”)处理来自调用集群组件110的请求113。数据165以冗余方式存储在相应工作器150可访问的存储装置160中,并且每个请求113可能需要访问(用于读取和/或写入)由请求113中的密钥标识的存储在存储装置160中的数据的特定部分,该数据的特定部分分布在由密钥确定的工作器的特定子集之中。在持有特定请求所需的密钥数据的那些工作器中,一个工作器被指定为执行请求113的主要工作器(例如,工作器150a),而其他工作器则被指定为备用工作器,因为这些工作器通常不或者不必执行该请求,但是其数据版本根据主要工作器或以与主要工作器相同的方式进行更新。In Figure 2, root component 140 (referred to as the “root”) performs some of the synchronization functions described in full below, but does not directly participate in the flow or computation of the data to be processed. Multiple worker components 150a to 150c (hereinafter referred to as “workers”) process requests 113 from the calling cluster component 110. Data 165 is stored redundantly in storage device 160 accessible to the respective worker 150, and each request 113 may require access (for reading and/or writing) to a specific portion of the data stored in storage device 160, identified by a key in the request 113, distributed across a specific subset of workers identified by the key. Among those workers holding the key data required for a particular request, one worker is designated as the primary worker (e.g., worker 150a) to execute request 113, while the others are designated as backup workers because these workers typically do not or need not execute the request, but their data versions are updated based on or in the same manner as the primary worker.
在图2中,特定输入记录103(其可以被认为是或包括要处理的数据单元)的路径被展示为进入调用集群组件110,然后由组件110将相应的请求113(与数据单元一起)发送到该请求的主要工作器150a(工作器A),并将来自主要工作器150a的响应115发送回调用集群组件110并发送到该请求的备用工作器150b(工作器B),并且最后从调用集群组件110输出或发送相应的结果105。通常,每个请求可能有多个备用组件;然而,为了便于说明,在以下许多示例中仅展示了单个备用组件。In Figure 2, the path of a specific input record 103 (which can be considered or includes the data unit to be processed) is shown as entering the call cluster component 110, then the component 110 sends the corresponding request 113 (along with the data unit) to the primary worker 150a (worker A) of the request, sends the response 115 from the primary worker 150a back to the call cluster component 110 and to the backup worker 150b (worker B) of the request, and finally outputs or sends the corresponding result 105 from the call cluster component 110. Typically, each request may have multiple backup components; however, for ease of illustration, only a single backup component is shown in many of the following examples.
如下面进一步讨论的,调用集群组件110将请求113缓存在重播缓冲器112中,并且如果必要,可以将请求重新发送到集群120以确保这些请求已被集群120正确地接收和/或处理。组件110还将响应115缓存在代管缓冲器114中,并且在检测到错误状态的情况下可以接收某些响应的冗余副本。通常,组件110“以代管的方式(in escrow)”保存响应,直到集群120通知组件110响应115在集群中适当地持久化(即,存储在具有合适的耐久性等级的数据存储装置中)。As discussed further below, the invoking cluster component 110 caches request 113 in replay buffer 112 and, if necessary, can resend the request to cluster 120 to ensure that these requests have been correctly received and/or processed by cluster 120. Component 110 also caches response 115 in escrow buffer 114 and can receive redundant copies of certain responses in the event of an error state. Typically, component 110 holds responses "in escrow" until cluster 120 notifies component 110 that response 115 has been properly persisted in the cluster (i.e., stored in a data storage device with an appropriate durability level).
根140通过维护时间(间隔)值并将其分配给其他组件以及将某些时间值分配给调用集群组件110来执行同步功能。参考图3,根140的时钟142维护三个时间。时间T1是当前工作时间或时间间隔,例如表示为整数值,并且被重复更新,例如每秒增加一次。Root 140 performs synchronization by maintaining time (interval) values and assigning them to other components, as well as assigning certain time values to calling cluster component 110. Referring to Figure 3, clock 142 of root 140 maintains three times. Time T1 is the current working time or time interval, represented as an integer value, and is repeatedly updated, for example, incrementing once per second.
当集群120从调用集群组件110接收到请求113,并且集群生成(或传输)响应115时,这些请求和响应各自与这些请求和响应分别被接收并生成(或传输)的工作时间(T1)相关联(或等效地与时间间隔相关联,在这些时间间隔期间,时间T1具有相同的值,即在T1的增量之间)。根维护并分配第二时间T2,该时间滞后于时间T1。时间T2表示这样的时间(间隔),其使得在该时间或更早的时间创建的在集群120的组件150a至150c之间发送的所有请求和/或响应已在组件150a至150c中的多处被复制(例如,在易失性存储器中),从而使得在用于处理错误的操作回滚的情况下不必重新发送这些请求和响应,如下面进行了更详细的描述。在一些示例中,复制(例如,在易失性存储器中)被称为以第一耐久性等级存储在数据存储装置中。根维护并分配第三时间(间隔)T3,该时间滞后于时间T1和T2,该时间表示这样的时间,其使得在该时间或更早的时间创建的所有请求和/或响应在存储数据165的工作器150a至150c中的至少一个或甚至所有处已被存储在持久性存储器中并成为永久的,使得在用于处理集群120中的组件故障的操作回滚的情况下不必重新发送或重新计算这些请求和响应。在一些示例中,被存储在持久性存储器(例如,存储到磁盘)中被称为以第二耐久性等级存储在数据存储装置中,其中该第二耐久性等级比第一耐久性等级相对更耐久。要注意的是,数据存储装置可以与多个不同耐久性等级相关联,这些不同耐久性等级比具有第一耐久性等级的数据存储装置和具有第二耐久性等级的数据存储装置相对更耐久或更不耐久。例如,集群外部的异地数据存储装置可以具有第三耐久性等级,其比第一耐久性等级和第二耐久性等级相对更耐久。在一些示例中,时间间隔T1、T2和T3可替代地称为“状态一致性指示符”。When cluster 120 receives request 113 from calling cluster component 110 and the cluster generates (or transmits) response 115, these requests and responses are each associated with the working time (T1) during which they were received and generated (or transmitted) (or equivalently with time intervals during which time T1 has the same value, i.e., between increments of T1). A second time T2 is maintained and allocated, which lags behind time T1. Time T2 represents a time (interval) such that all requests and/or responses created at or earlier times and sent between components 150a to 150c of cluster 120 have been replicated in multiple locations within components 150a to 150c (e.g., in volatile memory), so that these requests and responses do not need to be resent in the event of an operational rollback to handle errors, as described in more detail below. In some examples, replication (e.g., in volatile memory) is referred to as being stored in a data storage device at a first durability level. The root maintains and allocates a third time (interval) T3, which lags behind times T1 and T2. This time represents a period such that all requests and/or responses created at or earlier than this time have been stored in persistent memory and become permanent at at least one or all of the workers 150a to 150c of the data storage 165, so that these requests and responses do not need to be resent or recalculated in the event of an operational rollback for handling component failures in cluster 120. In some examples, being stored in persistent memory (e.g., stored to disk) is referred to as being stored in the data storage device at a second durability level, where the second durability level is relatively more durable than the first durability level. It should be noted that the data storage device may be associated with multiple different durability levels, which are relatively more or less durable than data storage devices with a first durability level and data storage devices with a second durability level. For example, a remote data storage device outside the cluster may have a third durability level, which is relatively more durable than the first and second durability levels. In some examples, time intervals T1, T2, and T3 can be alternatively referred to as "state consistency indicators".
在本说明书中稍后描述根140确定何时增加复制时间(T2)或持久性时间(T3)的机制,以及将时间(T1-T3)的值分配给工作器150a至150c的机制。The mechanism by which root 140 determines when to increase the replication time (T2) or persistence time (T3), and the mechanism by which the value of time (T1-T3) is assigned to workers 150a to 150c, are described later in this specification.
在正常操作中,由集群120接收的请求113在基于该请求的数据单元的密钥被标识为主要工作器的工作器150处被处理,并且通常在也基于所需数据的密钥标识的一个或多个备用工作器150处被处理。参考图4,该处理可以表示为在调用集群组件110以及主要工作器和备用工作器150处该请求的不同状态之间的转换。注意,不同的请求处于不同的状态并且通常根据引用的数据在不同的工作器处进行处理,并且因此,调用集群组件和任何特定工作器可能具有不同状态下的许多请求。In normal operation, the request 113 received by cluster 120 is processed at worker 150, which is identified as the primary worker based on the key of the data unit of the request, and typically at one or more standby workers 150, also identified based on the key of the required data. Referring to Figure 4, this processing can be represented as transitions between different states of the request at the calling cluster component 110 and at the primary and standby workers 150. Note that different requests are in different states and are typically processed at different workers depending on the referenced data, and therefore, there may be many requests calling cluster components and any particular worker in different states.
通常,每个密钥与例如基于该密钥(例如,密钥的确定性函数,该函数为每个密钥值以不可预测的方式分配备用工作器)以伪随机方式选择的工作器150的相应子集相关联。更一般地并且优选地,这些子集与其他子集重叠,而不是根据密钥值形成完整的工作器集的分区。Typically, each key is associated with a corresponding subset of workers 150 selected in a pseudo-random manner, for example, based on that key (e.g., a deterministic function of the key that assigns standby workers to each key value in an unpredictable manner). More generally and preferably, these subsets overlap with other subsets, rather than forming partitions of the complete set of workers based on key values.
当在调用集群组件110处针对每个输入记录103形成具有(或由调用集群组件分派)唯一标识符rid的请求113时,该请求在调用集群组件中进入状态A。在下面的描述中,每个请求113在调用集群组件中处于标记为A-C的三个状态之一,并且在处理该请求的工作器150中的每一个处,处于标记为A-I的九个不同状态之一。在调用集群组件110记录请求113之后,其确定被分派为该请求的主要工作器的工作器150,并将请求113发送到该工作器150(图2中示出为工作器A)。注意,在替代性实施例中,调用集群组件110可能不知道哪个工作器是指定的主要工作器,并且请求113可能在集群120内被内部路由以到达指定的主要工作器150a。请求113在调用集群组件110处保持状态A,直到从集群120接收回对该请求的响应115为止。When a request 113 with a unique identifier rid is formed at the invocation cluster component 110 for each input record 103, the request enters state A in the invocation cluster component. In the following description, each request 113 is in one of three states labeled A-C in the invocation cluster component, and at each of the workers 150 processing the request, it is in one of nine different states labeled A-I. After the invocation cluster component 110 records the request 113, it determines the worker 150 assigned as the primary worker for the request and sends the request 113 to that worker 150 (shown as worker A in Figure 2). Note that in an alternative embodiment, the invocation cluster component 110 may not know which worker is the designated primary worker, and the request 113 may be internally routed within the cluster 120 to reach the designated primary worker 150a. The request 113 remains in state A at the invocation cluster component 110 until a response 115 for the request is received from the cluster 120.
当在主要工作器(图2中标记为工作器A)处接收到请求113时,该请求在主要工作器处进入状态A。主要工作器为请求分派请求时间,该请求时间表示为ta,等于(对其已知是)从根140开始分配的当前工作时间T1(认识到根增加T1与工作器知道该增量之间可能存在时间滞后)。在此状态下,请求113与请求id rid、请求时间(在此示例中被表示为ta)相关联地存储在易失性存储器155中,并且被指定为处于等待在主要工作器处执行的状态。在此状态A下,主要工作器将请求113发送到该请求的一个或多个备用工作器150(即,由密钥确定的)。在主要工作器处,例如,基于根据分派给请求的时间(ta)以及(可选地)请求在主要工作器处的到达顺序而对资源的有序分配,最终为请求分派资源以执行该请求。当请求113开始在主要工作器上执行时,该请求在主要工作器处进入状态B。当处理产生响应115时,在此示例中,假设T1工作时间这时为tb,则主要工作器处请求的状态变为状态C。在状态C下,响应115与时间tb相关联地存储在易失性存储器156中。如下面进一步讨论的,响应115和对工作器处的数据存储装置160的任何更新都与时间(此处为时间tb)相关联地进行存储,其方式允许例如使用版本化数据库或其他形式的版本化数据结构消除根据先前的回滚时间的影响。在此状态C下,响应115被传输到调用集群组件110以及(多个)备用组件150。When request 113 is received at the primary worker (labeled worker A in Figure 2), the request enters state A at the primary worker. The primary worker assigns a request time, denoted as ta, to the request, which is equal to (it is known that) the current working time T1 allocated starting from root 140 (recognizing that there may be a time lag between the root increment T1 and the worker knowing that this increment). In this state, request 113 is stored in volatile memory 155 in association with request id rid, request time (denoted as ta in this example), and is designated as being in a state of waiting to be executed at the primary worker. In state A, the primary worker sends request 113 to one or more standby workers 150 (i.e., determined by a key) for the request. At the primary worker, resources are ultimately assigned to the request for execution, for example, based on the ordered allocation of resources according to the time (ta) assigned to the request and (optionally) the order in which the requests arrive at the primary worker. When request 113 begins execution on the primary worker, the request enters state B at the primary worker. When processing produces response 115, in this example, assuming the working time T1 is tb at this point, the requested state at the primary worker changes to state C. In state C, response 115 is stored in volatile memory 156 associated with time tb. As discussed further below, response 115 and any updates to the data storage device 160 at the worker are stored in association with time (here, time tb), in a manner that allows, for example, the use of a versioned database or other form of versioned data structure to eliminate the effects based on previous rollback times. In state C, response 115 is transmitted to the calling cluster component 110 and the standby components(s) 150.
在调用集群组件110处,当从主要工作器接收到响应115时,该请求进入状态B,在该状态下,该响应与主要工作器产生该响应的时间tb相关联地存储。响应115在调用集群组件处被保留在代管缓冲器114中,直到该调用集群组件从根140接收到等于或大于tb的代管时间为止。根据来自该调用集群组件的请求的持久性要求,根可以提供复制时间T2或持久性时间T3作为调用集群组件的代管时间。当调用集群组件110接收到等于或大于tb的代管时间时,该调用集群组件将结果105从调用集群组件中发送出去,并且相应的请求113进入空状态C,在该状态下,不再需要请求113或其响应115的进一步记录(例如,可以将其完全删除)。At the calling cluster component 110, when a response 115 is received from the primary worker, the request enters state B, in which the response is stored in association with the time tb when the primary worker generated the response. Response 115 is held in the escrow buffer 114 at the calling cluster component until the calling cluster component receives an escrow time equal to or greater than tb from the root 140. Depending on the persistence requirements of the request from the calling cluster component, the root can provide a replication time T2 or a persistence time T3 as the escrow time for the calling cluster component. When the calling cluster component 110 receives an escrow time equal to or greater than tb, the calling cluster component sends out result 105, and the corresponding request 113 enters an empty state C, in which further recording of request 113 or its response 115 is no longer required (e.g., it can be completely deleted).
在(多个)备用工作器150处,当备用工作器接收到来自主要工作器的请求113时,备用工作器进入状态F,在该状态下,该请求与原始请求时间ta相关联(即使当前工作时间T1的增量超过此值也是如此),并且该请求处于等待来自主要工作器的响应的状态。当备用工作器150b接收到来自主要工作器的响应115并因此将响应115复制在该备用工作器的易失性存储器156中时,该请求进入状态G。At standby worker(s) 150, when a standby worker receives a request 113 from the primary worker, the standby worker enters state F, in which the request is associated with the original request time ta (even if the increment of the current working time T1 exceeds this value), and the request is awaiting a response from the primary worker. When standby worker 150b receives a response 115 from the primary worker and thus copies the response 115 into its volatile memory 156, the request enters state G.
主要工作器或备用工作器一具有新生成的响应115,其就可以自由地开始将该响应保存到诸如基于磁盘或基于非易失性存储器的数据库或文件系统等的持久性存储装置160(参见状态D和H)的过程。可以使用基于日志的方法,其中首先在基于易失性存储器的日志中记录对持久性存储器的更新,并且将该日志的部分不时地写入持久性存储装置160。注意,即使当更新日志的一部分被写入持久性存储装置160时,也不会使那些更新成为永久的(即“已提交”),直到关于被认为是永久的更新程度的显式指示符被写入到持久性存储装置为止。Once the primary or standby worker receives the newly generated response 115, it is free to begin the process of saving the response to persistent storage 160, such as a disk-based or non-volatile memory-based database or file system (see states D and H). A log-based approach can be used, where updates to persistent storage are first recorded in a volatile memory-based log, and portions of that log are periodically written to persistent storage 160. Note that even when a portion of the update log is written to persistent storage 160, those updates are not made permanent (i.e., "committed") until an explicit indicator of the degree to which an update is considered permanent is written to persistent storage.
在根140已确定已在所有适当的工作器处复制了与时间tb以及更早时间相关联的所有请求和响应的时间,T2达到或增加至tb。在将时间T2=tb从根140分配给主要工作器和备用工作器150之后,这些工作器使响应永久地保存在持久性存储装置160中。如果经过该时间tb的更新日志尚未写入持久性存储器,则它们在那时被写入。更一般地,到时间T2达到或增加到tb为止,经过时间tb的日志已由工作器写入到持久性存储装置160,并且所有必须在此时完成的是通过记录将在持久日志中经过时间tb的更新视为永久的指示符来完成使更新变为永久的任务。在主要工作器正在将日志永久化的可能短时间期间,它处于状态D。当主要工作器已将对图4中所展示的请求的响应保存在持久性存储装置中时,它进入状态E。类似地,当备用工作器使响应永久化时,它处于状态H,并且当备用工作器使响应永久地保存在持久性存储器中时,它进入状态I。当根确定与时间tb(以及更早的时间)相关联的所有响应都永久地保存在持久性存储器中(即,都处于状态E或I)时,根将持久性时间T3增加到tb。如上所述,对于在调用集群组件处的请求的代管时间是持久性时间T3的情况,根140通知调用集群组件110代管时间已等于或大于tb,并且调用集群组件110将针对该请求113和响应115的相应结果105释放到应用程序(例如,图)内的一个或多个其他组件。Once root 140 has determined that all requests and responses associated with time tb and earlier have been replicated at all appropriate workers, T2 reaches or increases to tb. After allocating time T2 = tb from root 140 to the primary and standby workers 150, these workers permanently store the responses in persistent storage 160. If the update logs elapsed to time tb have not yet been written to persistent storage, they are written at that time. More generally, by the time T2 reaches or increases to tb, the logs elapsed to time tb have been written to persistent storage 160 by the workers, and all that must be done at this time is to make the updates permanent by recording an indicator that the updates elapsed to time tb in the persistent logs will be considered permanent. During the possible short period during which the primary worker is making the logs permanent, it is in state D. When the primary worker has stored the responses to the requests shown in Figure 4 in persistent storage, it enters state E. Similarly, when the standby worker makes the response permanent, it is in state H, and when the standby worker makes the response permanently stored in persistent memory, it enters state I. When the root determines that all responses associated with time tb (and earlier times) are permanently stored in persistent memory (i.e., all are in state E or I), the root increases the persistence time T3 to tb. As described above, for the case where the custody time of a request at the calling cluster component is persistence time T3, the root 140 notifies the calling cluster component 110 that the custody time is equal to or greater than tb, and the calling cluster component 110 releases the corresponding result 105 for the request 113 and response 115 to one or more other components within the application (e.g., the figure).
如上所述,在正常操作中,当在集群中处理来自调用集群组件的连续请求113时,根更新工作时间T1,响应115返回到调用集群组件、并根据代管时间T2或T3的更新从调用集群组件释放到图。通常,对特定请求113的处理可能花费工作时间T1的许多时间“节拍”,例如数以十计或数以百计的节拍,并且因此集群可能有许多进行中的请求、以及与这些请求相关联的许多不同的请求时间。此外,因为数据分布在工作器之间,因此负荷根据那些请求的密钥有效地分布在工作器之间,使得每个工作器可能具有该工作器正在充当主要工作器的多个请求(即,处于状态A-E之一),并且还具有该工作器正在充当备用工作器的多个请求(即,处于状态F-I之一)。As described above, in normal operation, when processing consecutive requests 113 from calling cluster components in the cluster, the root update work time T1, the response 115 is returned to the calling cluster component, and released from the calling cluster component to the graph according to the update of the hosting time T2 or T3. Typically, processing a particular request 113 may take many time "ticks" of work time T1, such as tens or hundreds of ticks, and therefore the cluster may have many requests in progress, and many different request times associated with these requests. Furthermore, because the data is distributed among the workers, the load is effectively distributed among the workers according to the keys of those requests, such that each worker may have multiple requests that it is acting as the primary worker (i.e., in one of states A-E), and also multiple requests that it is acting as a backup worker (i.e., in one of states F-I).
要注意的是,到达集群的用于执行任务的一些请求使用如本文中描述的过程来复制任务并复制执行该任务的相应结果。例如,在任务已被标记并在备用工作器处复制(但不一定使其持久化)之后,该任务将在主要工作器处初始化。如果任务对数据记录进行操作,则初始化可能涉及保留记录的原始版本1。然后,该任务在主要工作器上执行,而在备用工作器上保持休眠。在处理已经完成之后,将存在记录的修改后的版本2。然后,任务的终止可以包括将记录的修改后的版本2从主要工作器发送到备用工作器。然后,主要工作器和备用工作器两者都能够删除记录的原始版本1(以及复制的任务)。这些步骤中的每一个都是合理高效的,但是如果任务持续时间很短,则与这些初始化和终止过程相关联的开销可能会使任务效率降低。It's important to note that some requests arriving at the cluster to execute tasks use procedures as described in this document to replicate the task and the corresponding results of executing it. For example, after a task has been tagged and replicated (but not necessarily persisted) at a standby worker, it is initialized at the primary worker. If the task operates on data records, initialization might involve retaining the original version 1 of the record. The task is then executed on the primary worker while remaining dormant on the standby worker. After processing is complete, a modified version 2 of the record will exist. Termination of the task can then include sending the modified version 2 of the record from the primary worker to the standby worker. Both the primary and standby workers are then able to delete the original version 1 of the record (and the replicated task). Each of these steps is reasonably efficient, but if the task duration is short, the overhead associated with these initialization and termination procedures can degrade task efficiency.
可替代地,对于持续时间相对较短的一些任务(“短任务”),可以使用不同的过程。短任务仍在备用工作器处被标记并复制。但是,初始化不需要保留记录的原始版本1。作为替代,在提交操作指示短任务和短任务的复制品已分别持久地存储在主要工作器和备用工作器上之后,短任务在这两个工作器上执行。在该执行结束时,将在主要工作器和备用工作器两者上都有记录的修改后的版本2的副本,而无需任何通信来传输修改后的记录。这两个工作器都有冗余处理,但是由于任务很短,所以此冗余不会对效率产生很大影响。例如,如果短任务是确定性的,并且不管哪个工作器执行该任务都产生相同的结果,则此替代性过程是有用的。Alternatively, for some tasks with relatively short durations (“short tasks”), a different process can be used. Short tasks are still marked and copied at the standby worker. However, initialization does not require retaining the original version 1 of the record. Instead, after a commit operation indicates that the short task and its copy have been persistently stored on the primary and standby workers respectively, the short task is executed on both workers. At the end of this execution, a modified version 2 copy of the record will be available on both the primary and standby workers without any communication to transfer the modified record. Both workers have redundant processing, but since the task is short, this redundancy will not significantly impact efficiency. This alternative process is useful, for example, if the short task is deterministic and produces the same result regardless of which worker executes it.
2正常操作的示例2. Example of normal operation
参考图5至图12,展示了调用集群组件110和集群120的正常操作的一个示例。在图5中,输入记录103到达调用集群组件110,并且调用集群组件110形成针对输入记录103的请求113。调用集群组件110将请求113与唯一请求标识符rid相关联,并将其存储在调用集群组件110的重播缓冲器112中。Referring to Figures 5 through 12, an example of normal operation of the calling cluster component 110 and cluster 120 is shown. In Figure 5, input record 103 arrives at the calling cluster component 110, and the calling cluster component 110 forms a request 113 for input record 103. The calling cluster component 110 associates request 113 with a unique request identifier rid and stores it in the replay buffer 112 of the calling cluster component 110.
调用集群组件110将请求113传输到集群120,并在时间T1=ta处在集群120中的主要工作器150a(工作器A)处接收到该请求。请求113被存储在主要工作器150a的易失性存储器155中,并且被分派有等于当前工作时间(T1=ta)的请求时间。将请求113的请求时间提供给调用集群组件110,该调用集群组件将请求时间(即,ta)与存储在重播缓冲器112中的请求113相关联。存储在调用集群组件110的重播缓冲器112中的请求113处于状态A(参见图4),以等待来自集群120的响应。存储在主要工作器的易失性存储器155中的请求113处于状态A,以等待分派计算资源以执行请求113。The calling cluster component 110 transmits request 113 to cluster 120, and receives the request at time T1 = ta on the primary worker 150a (worker A) in cluster 120. Request 113 is stored in the volatile memory 155 of the primary worker 150a and is assigned a request time equal to the current working time (T1 = ta). The request time of request 113 is provided to the calling cluster component 110, which associates the request time (i.e., ta) with request 113 stored in replay buffer 112. Request 113 stored in replay buffer 112 of the calling cluster component 110 is in state A (see Figure 4), awaiting a response from cluster 120. Request 113 stored in the volatile memory 155 of the primary worker is in state A, awaiting the allocation of computing resources to execute request 113.
参考图6,主要工作器将请求113发送到备用工作器150b(工作器B),其中,该请求被存储在备用工作器150b的易失性存储器155中。存储在备用工作器150b的易失性存储器155中的请求113处于状态F,以等待接收来自主要工作器的响应。Referring to Figure 6, the primary worker sends request 113 to the standby worker 150b (worker B), whereby the request is stored in the volatile memory 155 of the standby worker 150b. Request 113 stored in the volatile memory 155 of the standby worker 150b is in state F, awaiting a response from the primary worker.
参考图7,一旦主要工作器105将计算资源(例如主要工作器或集群的另一部分的计算资源)分派给请求113,则请求113在主要工作器105处进入状态B并开始执行。Referring to Figure 7, once the primary worker 105 allocates computing resources (e.g., computing resources of the primary worker or another part of the cluster) to request 113, request 113 enters state B at the primary worker 105 and begins execution.
参考图8,在时间T1=tb,主要工作器105完成了请求113的执行。请求113的执行生成响应115,该响应被存储在主要工作器的易失性存储器156中。响应115与请求113的请求标识符(rid)以及生成该响应的时间(tb)相关联。主要工作器将响应115发送到调用集群组件110和备用工作器150b,并且然后请求113处于状态C,以等待持久性时间T3达到tb。Referring to Figure 8, at time T1 = tb, the primary worker 105 completes the execution of request 113. The execution of request 113 generates a response 115, which is stored in the primary worker's volatile memory 156. Response 115 is associated with the request identifier (rid) of request 113 and the time (tb) at which the response was generated. The primary worker sends response 115 to the calling cluster component 110 and the standby worker 150b, and then request 113 is in state C, waiting for persistence time T3 to reach tb.
调用集群组件110接收响应115,并将该响应存储在其代管缓冲器114中。在将响应存储在代管缓冲器114的情况下,结果115在调用集群组件110处于状态B,以等待持久性时间T3(在此示例中为代管时间)达到tb。备用工作器150b接收响应115,并将该响应存储在该备用工作器的易失性存储器156中。请求113在备用工作器150b处进入状态G,以等待持久性时间T3达到tb。The calling cluster component 110 receives response 115 and stores it in its managed buffer 114. With the response stored in managed buffer 114, result 115 occurs when the calling cluster component 110 is in state B, waiting for the persistence time T3 (managed time in this example) to reach tb. The standby worker 150b receives response 115 and stores it in its volatile memory 156. Request 113 then enters state G at standby worker 150b, waiting for the persistence time T3 to reach tb.
尽管在图8中未示出,在将响应115存储(复制)在主要工作器150a和备用工作器150b的易失性存储器156的情况下,复制时间T2被设置为tb。Although not shown in Figure 8, in the case where the response 115 is stored (copied) in the volatile memory 156 of the primary worker 150a and the standby worker 150b, the copying time T2 is set to tb.
参考图9,一旦响应115被存储在主要工作器150a和备用工作器150b中的一个或两个的易失性存储器156中,主要工作器150a和备用工作器150b就开始将响应115存储到各自的持久性存储装置160,同时还保持存储在各自的易失性存储器155、156中。Referring to Figure 9, once the response 115 is stored in the volatile memory 156 of one or both of the primary worker 150a and the standby worker 150b, the primary worker 150a and the standby worker 150b begin to store the response 115 into their respective persistent storage devices 160, while also keeping it stored in their respective volatile memories 155, 156.
参考图10,在将响应115存储在主要工作器处并在备用工作器150b处复制响应之后,将持久性时间T3设置为tb。主要工作器150a和备用工作器150b最终将响应115永久存储在持久性存储装置160中。存储在主要工作器处的请求113处于状态D,并且存储在备用工作器150b处的请求113处于状态H,在这些状态下,请求113和响应115仍分别存储在易失性存储器155、156中。Referring to Figure 10, after storing response 115 at the primary worker and copying the response at the standby worker 150b, the persistence time T3 is set to tb. The primary worker 150a and the standby worker 150b ultimately permanently store response 115 in persistent storage 160. Request 113 stored at the primary worker is in state D, and request 113 stored at the standby worker 150b is in state H. In these states, request 113 and response 115 remain stored in volatile memories 155 and 156, respectively.
参考图11,此示例的代管时间是持久性时间T3,因此在T3更新为tb的情况下,存储在调用集群组件110处的请求113进入状态C,并且从该调用集群组件的代管缓冲器114释放响应115(其与时间tb相关联)。Referring to Figure 11, the escrow time in this example is a persistent time T3. Therefore, when T3 is updated to tb, the request 113 stored at the calling cluster component 110 enters state C, and the response 115 (which is associated with time tb) is released from the escrow buffer 114 of the calling cluster component.
参考图12,在响应115被永久地存储在主要工作器150a的持久性存储装置中的情况下,请求113进入状态E,在该状态下,请求113和响应115均未分别存储在其易失性存储器155、156中。类似地,在响应115被永久地存储在备用工作器150b的持久性存储装置中的情况下,请求113进入状态I,在该状态下,请求113和响应115均未存储在其易失性存储器155、156中。Referring to Figure 12, when response 115 is permanently stored in the persistent storage of the primary worker 150a, request 113 enters state E, in which neither request 113 nor response 115 is stored in their respective volatile memories 155 and 156. Similarly, when response 115 is permanently stored in the persistent storage of the standby worker 150b, request 113 enters state I, in which neither request 113 nor response 115 is stored in their respective volatile memories 155 and 156.
3回滚场景3 Rollback Scenario
尽管图4中的状态转换图表示正常操作,但有可能(但很少)未成功接收到工作器之间的消息。此外,工作器有可能在丢失其易失性存储器之后必须重新启动,或者工作器可能完全故障,使得工作器不进一步处理请求(即,担任主要角色或备用角色)。要注意的是,本文中描述的数据处理系统的一些实施例实施了本节中描述的所有回滚场景。还要注意的是,数据处理系统的其他实施例可以实施本节中描述的一个或多个但并非所有回滚场景。Although the state transition diagram in Figure 4 represents normal operation, it is possible (but rare) that messages between workers may not be successfully received. Furthermore, a worker may need to restart after losing its volatile memory, or the worker may fail completely, preventing it from processing further requests (i.e., assuming a primary or backup role). It should be noted that some embodiments of the data processing system described herein implement all the rollback scenarios described in this section. It should also be noted that other embodiments of the data processing system may implement one or more, but not all, of the rollback scenarios described in this section.
3.1场景1:tr<ta 3.1 Scenario 1: tr < ta
首先考虑集群确定存在一些未成功接收到的工作器间消息并且该消息与时间ta相关联的情况。一般而言,根通知所有工作器必须将时间“回滚”到ta之前的时间tr(即,tr<ta),例如,回滚到tr=ta-1。即使通过这种回滚,也可以将调用集群组件110提供的结果提供给应用程序或图,就像没有发生回滚一样,并且对分布在工作器之间的数据的更新保持与由调用集群组件提供的结果一致。具体地,直到结果被存储(例如,复制或持久化)在多个节点(例如,工作器)处之后,该结果才从调用集群组件110释放到应用程序或图,从而确保该结果将永远不会被重新调用或变为失效。换言之,发生的任何回滚必然发生在由调用集群组件110将结果提供给应用程序或图之前。First, consider the case where the cluster determines that some inter-worker messages were not successfully received and that these messages are associated with time ta. Generally, the root notifies all workers that they must "roll back" the time to a time tr prior to ta (i.e., tr < ta), for example, rolling back to tr = ta - 1. Even with this rollback, the results provided by the calling cluster component 110 can be provided to the application or graph as if no rollback had occurred, and updates to the data distributed among the workers remain consistent with the results provided by the calling cluster component. Specifically, the results are not released from the calling cluster component 110 to the application or graph until they are stored (e.g., replicated or persisted) across multiple nodes (e.g., workers), thus ensuring that the results will never be recalled or become invalid. In other words, any rollback that occurs must happen before the results are provided to the application or graph by the calling cluster component 110.
当根140确定因为未成功接收到一些工作器间消息而必须执行回滚时,根将回滚时间tr通知给调用集群组件110。当前时间T1增加,并且通常,从时间tr+1直到且包括T1-1的所有活动都被视为这些活动未发生。在调用集群组件110处的影响是,存储在重播缓冲器112中的处于状态B(即,代管时间尚未达到响应时间)的所有请求都返回到状态A,并且代管缓冲器114中的任何相应的响应115被丢弃。然后,处于状态A的请求113(因为这些已经处于状态A或已从状态B返回到状态A)被重新发送到集群120。When root 140 determines that a rollback must be performed due to the failure to successfully receive some inter-worker messages, the root notifies the calling cluster component 110 of the rollback time tr. The current time T1 is incremented, and typically, all activity from time tr+1 up to and including T1-1 is considered as having not occurred. The effect at the calling cluster component 110 is that all requests in state B (i.e., the escrow time has not yet reached the response time) stored in replay buffer 112 are returned to state A, and any corresponding responses 115 in escrow buffer 114 are discarded. Requests 113 in state A (because these are already in state A or have returned from state B to state A) are then resent to cluster 120.
针对请求的请求时间ta大于回滚时间tr(即,tr<ta)的情况,首先考虑集群中(即,在工作器150处)对尚未开始执行但已在主要工作器和备用工作器之间复制的请求(即,主要工作器处于状态A并且备用工作器处于状态F)的影响。在此展示中,当前工作时间表示为tc。因为ta大于tr,所以调用集群组件无法假定请求已正确复制,并且因此移除了存储在主要工作器和备用工作器的易失性存储器155中的请求的版本。在集群120处从调用集群组件110接收具有相同请求id rid的请求113,并将该请求与新的请求时间tc相关联。当主要工作器接收到请求113时,该主要工作器在状态A下将请求113存储在其易失性存储器155中。主要工作器将请求113发送到(多个)备用工作器150,该备用工作器在状态F下将请求113存储在其易失性存储器155中。然后在主要工作器和备用工作器上以图4所展示的方式进行进一步的处理。For the case where the request time *ta* is greater than the rollback time *tr* (i.e., *tr* < *ta*), we first consider the impact on requests that have not yet started execution but have been replicated between the primary and standby workers (i.e., the primary worker is in state A and the standby worker is in state F) within the cluster (i.e., at worker 150). In this illustration, the current working time is denoted as *tc*. Because *ta* is greater than *tr*, the calling cluster component cannot assume that the request has been correctly replicated and therefore removes the version of the request stored in the volatile memory 155 of the primary and standby workers. At cluster 120, a request 113 with the same request id *rid* is received from the calling cluster component 110 and associated with the new request time *tc*. When the primary worker receives request 113, it stores request 113 in its volatile memory 155 in state A. The primary worker sends request 113 to standby worker(s) 150, which stores request 113 in its volatile memory 155 in state F. Then, further processing is performed on the main and backup processors as shown in Figure 4.
注意,如果备用工作器在从主要工作器接收到具有时间tc的已更新请求之前不知道该请求,则备用工作器也将以与现在已正确复制了该请求相同的方式进行。Note that if the standby worker is unaware of the request before receiving the updated request with time tc from the primary worker, the standby worker will also proceed in the same manner as if the request had now been correctly replicated.
参考图13至图15,示出了第一回滚场景的一个示例。在图13中,在时间ta发布的请求113被存储在调用集群组件110处的重播缓冲器112中并且处于状态A。请求113被存储在主要工作器的易失性存储器155中并且处于状态A,因为该请求尚未开始执行。请求113还存储在备用工作器150b中并且处于状态F。Referring to Figures 13 through 15, an example of a first rollback scenario is shown. In Figure 13, request 113, published at time ta, is stored in the replay buffer 112 at the calling cluster component 110 and is in state A. Request 113 is stored in the volatile memory 155 of the primary worker and is in state A because the request has not yet started execution. Request 113 is also stored in the standby worker 150b and is in state F.
接收到回滚请求以将系统回滚到时间tr<ta。在图14中,在接收到回滚请求之后,从主要工作器150a的易失性存储器155和备用工作器150b的易失性存储器155中移除请求113。由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。重播缓冲器112中的新请求113’处于状态A。A rollback request is received to roll back the system to time tr<ta. In Figure 14, after receiving the rollback request, request 113 is removed from the volatile memory 155 of the primary worker 150a and the volatile memory 155 of the standby worker 150b. The calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'. The new request 113' in the replay buffer 112 is in state A.
在集群中,将新请求113’发送到主要工作器。主要工作器150a将新请求113’与请求时间tc一起存储在其易失性存储器155中。存储在主要工作器150a的易失性存储器155中的新请求113’处于状态A。In the cluster, a new request 113' is sent to the primary worker. The primary worker 150a stores the new request 113' along with the request time tc in its volatile memory 155. The new request 113' stored in the volatile memory 155 of the primary worker 150a is in state A.
参考图15,主要工作器将新请求113’发送到备用工作器150b。备用工作器150b将新请求113’存储在其易失性存储器155中,并将其与请求时间tc相关联。存储在备用工作器的易失性存储器155中的已更新请求113’处于状态F。Referring to Figure 15, the primary worker sends a new request 113' to the standby worker 150b. The standby worker 150b stores the new request 113' in its volatile memory 155 and associates it with the request time tc. The updated request 113' stored in the standby worker's volatile memory 155 is in state F.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.2场景2:tr<ta,已开始执行 3.2 Scenario 2: tr < ta, execution has started.
在第二种情况下,先前请求的请求时间ta大于回滚时间tr(即,tr<ta),但是该请求已开始执行并且尚未在主要工作器上完成执行(即,请求在主要工作器处于状态B,其中可能计算了部分响应115,而在备用工作器处请求处于状态F)。在这种情况下,在主要工作器和备用工作器处终止执行并丢弃部分响应115(或允许执行完成,并且丢弃响应),并且调用集群组件110将请求113重新发送到集群120。存储在主要工作器和备用工作器处的请求分别返回到状态A和F。主要工作器以与尚未在主要工作器处开始执行请求相同的方式将请求通知给备用工作器。In the second scenario, the request time *ta* of the previous request is greater than the rollback time *tr* (i.e., *tr* < *ta*), but the request has already begun execution and has not yet completed on the primary worker (i.e., the request is in state B on the primary worker, where a partial response 115 may have been computed, while the request is in state F on the standby worker). In this case, execution is terminated on both the primary and standby workers, and the partial response 115 is discarded (or execution is allowed to complete, and the response is discarded), and cluster component 110 is invoked to resend request 113 to cluster 120. The requests stored on the primary and standby workers return to states A and F, respectively. The primary worker notifies the standby worker of the request in the same manner as if the request had not yet begun execution on the primary worker.
参考图16至图18,示出了第二回滚场景的一个示例。在图16中,在时间ta发布的请求113被存储在调用集群组件110处的重播缓冲器112中并且处于状态A。请求113被存储在主要工作器150a处的易失性存储器155中并且处于状态B,因为该请求已开始执行。该请求还存储在备用工作器150b中并且处于状态F。Referring to Figures 16 through 18, an example of a second rollback scenario is shown. In Figure 16, request 113, published at time ta, is stored in the replay buffer 112 at the calling cluster component 110 and is in state A. Request 113 is stored in volatile memory 155 at the primary worker 150a and is in state B because the request has begun execution. The request is also stored on the standby worker 150b and is in state F.
接收到回滚请求以将系统回滚到时间tr<ta。在图17中,在接收到回滚请求之后,从主要工作器150a的易失性存储器155和备用工作器150b的易失性存储器155中移除请求113。由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。重播缓冲器112中的新请求113’处于状态A。A rollback request is received to roll back the system to time tr<ta. In Figure 17, after receiving the rollback request, request 113 is removed from the volatile memory 155 of the primary worker 150a and the volatile memory 155 of the standby worker 150b. The calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'. The new request 113' in the replay buffer 112 is in state A.
在集群中,将新请求113’发送到主要工作器。主要工作器150a将新请求113’与请求时间tc一起存储在其易失性存储器155中。存储在主要工作器150a的易失性存储器155中的新请求113’处于状态A。In the cluster, a new request 113' is sent to the primary worker. The primary worker 150a stores the new request 113' along with the request time tc in its volatile memory 155. The new request 113' stored in the volatile memory 155 of the primary worker 150a is in state A.
参考图18,主要工作器150a将新请求113’发送到备用工作器150b。备用工作器150b将新请求113’存储在其易失性存储器155中,并将其与请求时间tc相关联。存储在备用工作器的易失性存储器155中的已更新请求113’处于状态F。Referring to Figure 18, the primary worker 150a sends a new request 113' to the standby worker 150b. The standby worker 150b stores the new request 113' in its volatile memory 155 and associates it with the request time tc. The updated request 113' stored in the standby worker's volatile memory 155 is in state F.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.3场景3:tr<ta<tb,执行已完成 3.3 Scenario 3: tr < ta < tb, execution completed
在第三种情况下,先前请求的请求时间ta再次大于回滚时间tr。然而,在这种情况下,我们假设在时间tb完成执行(即,tr<ta≤tb),并且响应已在备用工作器处复制并在调用集群组件110处接收。即,请求113在调用集群组件110处于状态B,该请求在主要工作器150a处于状态C,并且请求113在备用工作器150b处于状态G。不是仅仅像第二种情况那样必须终止执行正在进行的执行,而是移除已存储在主要工作器和备用工作器处的响应115。如上参考图4所述,在时间tb生成的响应以使得可以将特定时间以及更晚时间的所有更新从数据结构中移除的方式与时间tb相关联地存储在版本化数据结构中。在本情况下,通过移除在晚于时间tr的时间更新的所有数据版本,必须移除在时间tb进行的对所展示请求的更新,并且具有请求时间tc的请求在主要工作器处返回到状态A以等待执行,并且在备用工作器中返回到状态F以等待来自主要工作器的响应。在调用集群组件处,响应被丢弃,并且请求返回到状态A。In the third case, the request time *ta* of the previous request is again greater than the rollback time *tr*. However, in this case, we assume that execution is completed at time *tb* (i.e., *tr* < *ta* ≤ *tb*), and the response has been replicated at the standby worker and received at the calling cluster component 110. That is, request 113 is in state B at the calling cluster component 110, in state C at the primary worker 150a, and in state G at the standby worker 150b. Instead of simply terminating the ongoing execution as in the second case, the response 115 stored at the primary and standby workers is removed. As described above with reference to Figure 4, the response generated at time *tb* is stored in a versioned data structure in association with time *tb* in a way that allows all updates at a specific time and later to be removed from the data structure. In this scenario, by removing all data versions updated at times later than time tr, the update to the requested data at time tb must also be removed. The request with time tc returns to state A at the primary worker, awaiting execution, and to state F at the standby worker, awaiting a response from the primary worker. At the point of invocation of the cluster component, the response is discarded, and the request returns to state A.
参考图19至图21,示出了第三回滚场景的一个简单示例。在图19中,在时间ta发布的请求113被存储在调用集群组件110处的重播缓冲器112中。在时间tb生成的对请求的响应115被存储在代管缓冲器114中。因此,请求113在调用集群组件处于状态B。Referring to Figures 19 through 21, a simplified example of the third rollback scenario is shown. In Figure 19, request 113, published at time ta, is stored in replay buffer 112 at the calling cluster component 110. The response 115 generated at time tb is stored in managed buffer 114. Therefore, request 113 is in state B at the calling cluster component.
在集群中,请求113和响应115被存储在主要工作器150a处的易失性存储器155、156中。因此,请求113在主要工作器150a处于状态C。请求113和响应115还被存储在备用工作器处的易失性存储器155、156中。因此,请求在备用工作器150b处于状态G。In the cluster, request 113 and response 115 are stored in volatile memories 155 and 156 at the primary worker 150a. Therefore, request 113 is in state C at the primary worker 150a. Request 113 and response 115 are also stored in volatile memories 155 and 156 at the standby worker. Therefore, the request is in state G at the standby worker 150b.
接收到回滚请求以将系统回滚到时间tr<ta<tb。在图20中,在接收到回滚请求之后,将响应115从调用集群组件110的代管缓冲器114中移除。在集群120中,从主要工作器150a的易失性存储器155和备用工作器150b的易失性存储器155中移除请求113和响应115两者。A rollback request is received to roll back the system to time tr < ta < tb. In Figure 20, after receiving the rollback request, response 115 is removed from the managed buffer 114 of the calling cluster component 110. In cluster 120, both request 113 and response 115 are removed from the volatile memory 155 of the primary worker 150a and the volatile memory 155 of the standby worker 150b.
由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。重播缓冲器112中的新请求113’处于状态A。The calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'. The new request 113' in replay buffer 112 is in state A.
在集群中,新请求113’被发送到主要工作器150a。主要工作器150a将新请求113’与请求时间tc一起存储在其易失性存储器155中。存储在主要工作器150a的易失性存储器155中的新请求113’处于状态A。In the cluster, a new request 113' is sent to the primary worker 150a. The primary worker 150a stores the new request 113' along with the request time tc in its volatile memory 155. The new request 113' stored in the volatile memory 155 of the primary worker 150a is in state A.
参考图21,主要工作器150a将新请求113’发送到备用工作器150b。备用工作器150b将新请求113’存储在其易失性存储器155中,并将其与请求时间tc相关联。存储在备用工作器的易失性存储器155中的已更新请求113’处于状态F。Referring to Figure 21, the primary worker 150a sends a new request 113' to the standby worker 150b. The standby worker 150b stores the new request 113' in its volatile memory 155 and associates it with the request time tc. The updated request 113' stored in the standby worker's volatile memory 155 is in state F.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.4场景4:ta<tr,执行尚未开始 3.4 Scenario 4: ta < tr, execution has not yet started.
在第四种情况下,回滚时间tr是在原始请求时间ta处或其之后(即,ta≤tr),并且原始请求尚未开始执行。该请求被重传到集群120,并且在主要工作器和备用工作器处将该请求排在原始请求(即,{rid,ta})之后执行。主要工作器执行原始请求并生成响应(即,{rid,tb})。然后,主要工作器进行到开始执行重传的请求(即,{rid,tc}),但检测到已经存在与重传的请求的rid相关联的响应,并放弃执行重传的请求。In the fourth scenario, the rollback time *tr* is at or after the original request time *ta* (i.e., *ta* ≤ *tr*), and the original request has not yet begun execution. The request is retransmitted to cluster 120, and is scheduled for execution after the original request (i.e., *{rid, ta}*) on both the primary and standby workers. The primary worker executes the original request and generates a response (i.e., *{rid, tb}*). Then, the primary worker proceeds to execute the retransmitted request (i.e., *{rid, tc}*), but detects that a response already exists associated with the *rid* of the retransmitted request and abandons the execution of the retransmitted request.
参考图22至图25,示出了第四回滚场景的一个示例。在图22中,在时间ta发布的原始请求113被存储在调用集群组件110处的重播缓冲器112中并且处于状态A。原始请求113被存储在主要工作器150a的易失性存储器155中并且处于状态A,因为该请求尚未开始执行。原始请求113还存储在备用工作器150b中并且处于状态F。Referring to Figures 22 through 25, an example of a fourth rollback scenario is shown. In Figure 22, the original request 113, published at time ta, is stored in the replay buffer 112 at the calling cluster component 110 and is in state A. The original request 113 is also stored in the volatile memory 155 of the primary worker 150a and is in state A because the request has not yet started execution. The original request 113 is also stored in the standby worker 150b and is in state F.
接收到回滚请求以将系统回滚到时间ta<tr。在图23中,由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。重播缓冲器112中的请求113保持处于状态A。A rollback request is received to roll back the system to time ta<tr. In Figure 23, the calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'. Request 113 in replay buffer 112 remains in state A.
在集群中,新请求113’被发送到主要工作器150a。主要工作器150a接收新请求113’,并将新请求113’排在原始请求113之后执行。存储在主要工作器150a的易失性存储器155中的原始请求113和新请求113’两者处于状态A。In the cluster, a new request 113' is sent to the primary worker 150a. The primary worker 150a receives the new request 113' and schedules its execution after the original request 113. Both the original request 113 and the new request 113' are in state A in the volatile memory 155 of the primary worker 150a.
参考图24,主要工作器150a将新请求113’发送到备用工作器150b。备用工作器150b接收新请求113’,并将新请求113’排在原始请求113之后执行。存储在备用工作器150b的易失性存储器155中的原始请求113和新请求113’两者都处于状态F。Referring to Figure 24, the primary worker 150a sends a new request 113' to the standby worker 150b. The standby worker 150b receives the new request 113' and schedules it for execution after the original request 113. Both the original request 113 and the new request 113' stored in the volatile memory 155 of the standby worker 150b are in state F.
参考图25,主要工作器150a已经执行了原始请求113以生成响应115,并且响应115在该主要工作器的持久性存储装置160中被持久化。结果,原始请求113在主要工作器150a处于状态D。新请求113’尚未在主要工作器150a处开始执行,并且因此处于状态A。Referring to Figure 25, the primary worker 150a has executed the original request 113 to generate a response 115, and the response 115 is persisted in the persistent storage 160 of the primary worker. As a result, the original request 113 is in state D on the primary worker 150a. The new request 113' has not yet begun execution on the primary worker 150a and is therefore in state A.
还已将响应115提供给备用工作器150b和调用集群组件110。备用工作器150b已将响应115存储在其易失性存储器156中,并已将响应持久化到其持久性存储装置160。因此,原始请求113在备用工作器处于状态H。调用集群组件110已将响应115存储在其代管缓冲器114中,并且调用集群组件的重播缓冲器112中的请求113’处于状态B。Response 115 has also been provided to standby worker 150b and calling cluster component 110. Standby worker 150b has stored response 115 in its volatile memory 156 and has persisted the response to its persistent storage device 160. Therefore, original request 113 is in state H on the standby worker. Calling cluster component 110 has stored response 115 in its managed buffer 114, and request 113' in the calling cluster component's replay buffer 112 is in state B.
当新请求113’在主要工作器150a处开始执行时,主要工作器150a识别出新请求113’同与响应115相同的请求标识符rid相关联,并且因此不执行新请求113’,因为它是复制品。在一些示例中,可以将响应115重传到调用集群组件,该调用集群组件将响应115作为复制品而忽略。When new request 113' begins execution at primary worker 150a, primary worker 150a recognizes that new request 113' is associated with the same request identifier rid as response 115, and therefore does not execute new request 113' because it is a copy. In some examples, response 115 can be retransmitted to the calling cluster component, which will ignore response 115 as a copy.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.5场景5:ta<tr,已开始执行 3.5 Scenario 5: ta < tr, execution has started.
在第五种情况下,回滚时间tr是在原始请求时间ta处或其之后(即,ta≤tr),并且原始请求已经开始执行,但在主要工作器处尚未完成执行(即,请求在主要工作器处于状态B,而请求在备用工作器处于状态F)。在这种情况下,在主要工作器和备用工作器处终止执行(或允许完成,并且丢弃响应)(即,存储在主要工作器和备用工作器处的请求分别返回到状态A和F)。In the fifth scenario, the rollback time tr is at or after the original request time ta (i.e., ta ≤ tr), and the original request has begun execution but has not yet completed at the primary worker (i.e., the request is in state B at the primary worker and in state F at the standby worker). In this case, execution is terminated (or allowed to complete and the response is discarded) at both the primary and standby workers (i.e., the requests stored at the primary and standby workers are returned to states A and F, respectively).
调用集群组件110将该请求重传到集群120,其中,在主要工作器和备用工作器处将该请求排在原始请求(即,{rid,ta})之后执行。主要工作器执行原始请求并生成响应(即,{rid,tb})。然后,主要工作器进行到开始执行重传的请求(即,{rid,tc}),但检测到已经存在与重传的请求的rid相关联的响应,并放弃执行重传的请求。The request is retransmitted to cluster 120 via cluster component 110, where it is executed after the original request (i.e., {rid, ta}) at both the primary and standby workers. The primary worker executes the original request and generates a response (i.e., {rid, tb}). Then, the primary worker proceeds to the request to begin retransmission (i.e., {rid, tc}), but detects that a response already exists associated with the rid of the retransmitted request and abandons the retransmission.
参考图26至图29,示出了第五回滚场景的一个示例。在图26中,在时间ta发布的原始请求113被存储在调用集群组件110处的重播缓冲器112中并且处于状态A。原始请求113被存储在主要工作器150a处的易失性存储器155中并且处于状态B,因为该请求已开始执行。原始请求113还存储在备用工作器150b中并且处于状态F。Referring to Figures 26 through 29, an example of a fifth rollback scenario is shown. In Figure 26, the original request 113, published at time ta, is stored in the replay buffer 112 at the calling cluster component 110 and is in state A. The original request 113 is stored in volatile memory 155 at the primary worker 150a and is in state B because the request has begun execution. The original request 113 is also stored in the standby worker 150b and is in state F.
接收到回滚请求以将系统回滚到时间ta<tr。在图27中,由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。重播缓冲器112中的请求113保持处于状态A。A rollback request is received to roll back the system to time ta<tr. In Figure 27, the calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'. Request 113 in replay buffer 112 remains in state A.
在集群120中,存储在主要工作器150a的易失性存储器155中的原始请求113的执行被终止,并且原始请求113返回到状态A。新请求113’被发送到主要工作器150a。主要工作器150a接收新请求113’,并将新请求113’排在原始请求113之后执行。存储在主要工作器150a的易失性存储器155中的新请求113’处于状态A。In cluster 120, the execution of the original request 113, stored in the volatile memory 155 of the primary worker 150a, is terminated, and the original request 113 returns to state A. A new request 113' is sent to the primary worker 150a. The primary worker 150a receives the new request 113' and schedules it for execution after the original request 113. The new request 113' stored in the volatile memory 155 of the primary worker 150a is in state A.
参考图28,主要工作器150a将新请求113’发送到备用工作器150b。备用工作器150b接收新请求113’,并将新请求113’排在原始请求113之后执行。存储在备用工作器150b的易失性存储器155中的原始请求113和新请求113’两者都处于状态F。Referring to Figure 28, the primary worker 150a sends a new request 113' to the standby worker 150b. The standby worker 150b receives the new request 113' and schedules it for execution after the original request 113. Both the original request 113 and the new request 113' stored in the volatile memory 155 of the standby worker 150b are in state F.
参考图29,主要工作器150a已执行了原始请求113并且已生成了响应115。响应115在该主要工作器的持久性存储装置160中被持久化。结果,原始请求113在主要工作器150a处于状态D。新请求113’尚未在主要工作器150a处开始执行,并且因此处于状态A。Referring to Figure 29, the primary worker 150a has executed the original request 113 and generated a response 115. The response 115 is persisted in the persistent storage 160 of the primary worker. As a result, the original request 113 is in state D on the primary worker 150a. The new request 113' has not yet begun execution on the primary worker 150a and is therefore in state A.
还已将响应115复制到备用工作器150b和调用集群组件110。备用工作器150b已将响应115存储在其易失性存储器156中,并已将响应持久化到其持久性存储装置160。因此,原始请求113在备用工作器处于状态H。调用集群组件110已将响应115存储在其代管缓冲器114中,并且请求113’在调用集群组件的重播缓冲器112中处于状态B。The response 115 has also been copied to the standby worker 150b and the calling cluster component 110. The standby worker 150b has stored the response 115 in its volatile memory 156 and has persisted the response to its persistent storage device 160. Therefore, the original request 113 is in state H on the standby worker. The calling cluster component 110 has stored the response 115 in its managed buffer 114, and request 113' is in state B in the calling cluster component's replay buffer 112.
当新请求113’在主要工作器150a处开始执行时,主要工作器150a识别出新请求113’同与响应115相同的请求标识符rid相关联,并且因此不执行新请求113’,因为它是复制品。在一些示例中,响应115可以被重传到调用集群组件110,调用集群组件将响应115作为复制品而忽略。When new request 113' begins execution at primary worker 150a, primary worker 150a recognizes that new request 113' is associated with the same request identifier rid as response 115, and therefore does not execute new request 113' because it is a copy. In some examples, response 115 may be retransmitted to cluster component 110, which will ignore response 115 as a copy.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.6场景6:ta<tb<tr,执行已完成 3.6 Scenario 6: ta < tb < tr, execution completed
在第六种情况下,回滚时间tr是在请求时间ta处或其之后,并且请求已在时间tb处完成执行,该时间tb也在该回滚时间处或其之前(即,ta≤tb≤tr)。如果响应已成功提供给调用集群组件110(即,该请求在调用集群组件处于状态B),则回滚请求不会导致重新发送该请求,该回滚请求也不会导致从代管缓冲器114移除任何响应。即,与ta相关联的任何请求和与tb相关联的任何响应均保持不变。In the sixth scenario, the rollback time *tr* is at or after the request time *ta*, and the request has been completed at time *tb*, which is also at or before the rollback time (i.e., *ta* ≤ *tb* ≤ *tr*). If the response has been successfully provided to the calling cluster component 110 (i.e., the request was in state B when the calling cluster component was in state B), the rollback request will not cause the request to be resent, nor will it cause any response to be removed from the managed buffer 114. That is, any requests associated with *ta* and any responses associated with *tb* remain unchanged.
但是,如果未将响应成功提供给调用集群组件110,则调用集群组件110将请求重传到集群120。当主要工作器接收到重传的请求时,主要工作器开始执行重传的请求(即,{rid,tc}),但检测到已经存在与请求标识符rid相关联的响应115。因此,不执行重传的请求,并且将通过执行原始请求而生成的响应重传到调用集群组件110。调用集群组件110接收具有响应时间tb的响应,该响应时间用于确定何时可以从调用集群组件处的代管发送响应。However, if the response is not successfully provided to the calling cluster component 110, the calling cluster component 110 retransmits the request to cluster 120. When the primary worker receives the retransmitted request, it begins to execute the retransmitted request (i.e., {rid, tc}), but detects that a response 115 already exists associated with the request identifier rid. Therefore, the retransmitted request is not executed, and the response generated by executing the original request is retransmitted to the calling cluster component 110. The calling cluster component 110 receives the response with a response time tb, which is used to determine when a response can be sent from the managed service at the calling cluster component.
参考图30至图32,示出了第六回滚场景的一个示例。在图30中,在时间ta发布的原始请求113被存储在调用集群组件110处的重播缓冲器112中。在时间tb生成了对原始请求113的响应115,但该响应未到达调用集群组件110的代管缓冲器114。因此,请求113在调用集群组件110处于状态A。Referring to Figures 30 to 32, an example of a sixth rollback scenario is shown. In Figure 30, the original request 113, published at time ta, is stored in the replay buffer 112 at the calling cluster component 110. A response 115 to the original request 113 is generated at time tb, but this response does not reach the managed buffer 114 of the calling cluster component 110. Therefore, request 113 is in state A at the calling cluster component 110.
在集群中,请求113和响应115被存储在主要工作器150a的易失性存储器155、156中。因此,请求113在主要工作器150a处于状态C。请求113和响应115还被存储在备用工作器处的易失性存储器155、156中。因此,请求在备用工作器150b处于状态G。In the cluster, request 113 and response 115 are stored in volatile memories 155 and 156 of the primary worker 150a. Therefore, request 113 is in state C on the primary worker 150a. Request 113 and response 115 are also stored in volatile memories 155 and 156 at the standby worker. Therefore, the request is in state G on the standby worker 150b.
接收到回滚请求以将系统回滚到时间ta<tb<tr。在图31中,由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。A rollback request is received to roll back the system to time ta < tb < tr. In Figure 31, the calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'.
新请求113’被发送到集群120中的主要工作器150a。主要工作器150a接收新请求113’,并将新请求113’在易失性存储器155中排队以便执行。存储在主要工作器150a的易失性存储器155中的原始请求113保持处于状态C,并且存储在主要工作器150a的易失性存储器155中的新请求113’处于状态A。A new request 113' is sent to the primary worker 150a in cluster 120. The primary worker 150a receives the new request 113' and queues it in volatile memory 155 for execution. The original request 113 stored in the volatile memory 155 of the primary worker 150a remains in state C, and the new request 113' stored in the volatile memory 155 of the primary worker 150a is in state A.
参考图32,当主要工作器150a开始执行新请求时,主要工作器150a识别出新请求113’具有与原始请求113相同的请求标识符rid并且在主要工作器150a处已经存在与请求标识符rid相关联的响应115。因此,主要工作器150a不执行新请求113’,而是将响应115重传到调用集群组件110。调用集群组件110接收响应115,并将该响应存储在代管缓冲器114中。在响应115被存储在调用集群组件110的代管缓冲器114中的情况下,调用集群组件110处于状态B。Referring to Figure 32, when the primary worker 150a begins executing a new request, it recognizes that the new request 113' has the same request identifier rid as the original request 113, and that a response 115 associated with the request identifier rid already exists at the primary worker 150a. Therefore, the primary worker 150a does not execute the new request 113', but instead retransmits the response 115 to the call cluster component 110. The call cluster component 110 receives the response 115 and stores it in a managed buffer 114. With the response 115 stored in the managed buffer 114 of the call cluster component 110, the call cluster component 110 is in state B.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.7场景7:ta<tr<tb,执行已完成 3.7 Scenario 7: ta < tr < tb, execution completed
在第七种情况下,回滚时间tr是在请求时间ta处或其之后,并且请求已在回滚时间之后的时间tb处完成执行(即,ta≤tr<tb),工作器之间的响应的复制可能没有成功。工作器丢弃具有tr之后的时间的所有响应115。存储在备用工作器中的请求113返回到状态F,并且存储在主要工作器中的请求113返回到状态B。调用集群组件110丢弃代管缓冲器114中的所有响应115,将存储在重播缓冲器112中的请求113返回到状态A,并且将请求113重新发送到集群120,该集群重新处理该请求。In the seventh scenario, the rollback time tr is at or after the request time ta, and the request has already been executed at a time tb after the rollback time (i.e., ta ≤ tr < tb). The replication of the response between workers may not have succeeded. The workers discard all responses 115 with a time after tr. Request 113 stored in the standby worker returns to state F, and request 113 stored in the primary worker returns to state B. Cluster component 110 discards all responses 115 in the managed buffer 114, returns request 113 stored in the replay buffer 112 to state A, and resends request 113 to cluster 120, which reprocesses the request.
参考图33至图35,示出了第七回滚场景的一个示例。在图33中,在时间ta发布的请求113被存储在调用集群组件110处的重播缓冲器112中。在时间tb生成的对请求的响应115被存储在代管缓冲器114中。因此,请求113在调用集群组件110处于状态B。Referring to Figures 33 to 35, an example of the seventh rollback scenario is shown. In Figure 33, request 113, published at time ta, is stored in replay buffer 112 at the calling cluster component 110. The response 115 generated at time tb is stored in managed buffer 114. Therefore, request 113 is in state B at the calling cluster component 110.
在集群120中,请求113和响应115被存储在主要工作器150a处的易失性存储器155、156中。因此,请求113在主要工作器150a处于状态C。请求113还存储在备用工作器105处的易失性存储器155、156中,但是响应115可能已经或可能没有成功地复制到备用工作器150b。因此,请求在备用工作器150b处可能处于或可能不处于状态G。In cluster 120, request 113 and response 115 are stored in volatile memories 155 and 156 at the primary worker 150a. Therefore, request 113 is in state C at the primary worker 150a. Request 113 is also stored in volatile memories 155 and 156 at the standby worker 105, but response 115 may have been or may not have been successfully copied to the standby worker 150b. Therefore, the request may or may not be in state G at the standby worker 150b.
接收到回滚请求以将系统回滚到时间ta<tr<tb。在图34中,将存储在调用集群组件110的代管缓冲器114中的响应115移除。由调用集群组件110将同与原始请求113相同的请求标识符(rid)相关联的新请求113’发布到集群120。在时间tc,新请求113’由集群120接收并且与请求时间tc相关联。集群120向调用集群组件110通知与新请求113’相关联的请求时间tc。重播缓冲器112中的新请求113’处于状态A。A rollback request is received to roll back the system to time ta < tr < tb. In Figure 34, the response 115 stored in the managed buffer 114 of the calling cluster component 110 is removed. The calling cluster component 110 publishes a new request 113' associated with the same request identifier (rid) as the original request 113 to cluster 120. At time tc, the new request 113' is received by cluster 120 and associated with the request time tc. Cluster 120 notifies the calling cluster component 110 of the request time tc associated with the new request 113'. The new request 113' in the replay buffer 112 is in state A.
在集群120中,备用工作器150b移除存储在其易失性存储器156中的、与tr之后的时间相关联的任何响应,并且因此回到状态F。主要工作器150a返回到状态B。新请求113’被发送到主要工作器150a。主要工作器接收新请求113’,并将新请求113’排在原始请求113之后执行。存储在主要工作器150a的易失性存储器155中的新请求113’处于状态A。In cluster 120, standby worker 150b removes any responses associated with the time following tr from its volatile memory 156 and thus returns to state F. Primary worker 150a returns to state B. A new request 113' is sent to primary worker 150a. Primary worker receives new request 113' and schedules it for execution after the original request 113. The new request 113' stored in volatile memory 155 of primary worker 150a is in state A.
在图35中,主要工作器150a在时间td完成原始请求113的执行并生成新响应115’。主要工作器150a将新响应115’发送到备用工作器150b和调用集群组件110,以使存储在主要工作器150a的易失性存储器中的原始请求113的状态转换为状态C。备用工作器150b接收新响应115’,并将新响应115’存储在其易失性存储器155中,以使存储在备用工作器的易失性存储器155中的原始请求113转换为状态G。调用集群组件110接收新响应115’,并将其存储在代管缓冲器114中,以使存储在重播缓冲器112中的新请求113’转换为状态B。In Figure 35, the primary worker 150a completes the execution of the original request 113 at time td and generates a new response 115'. The primary worker 150a sends the new response 115' to the standby worker 150b and invokes the cluster component 110 to transition the state of the original request 113 stored in the volatile memory of the primary worker 150a to state C. The standby worker 150b receives the new response 115' and stores it in its volatile memory 155 to transition the state of the original request 113 stored in the standby worker's volatile memory 155 to state G. The invoked cluster component 110 receives the new response 115' and stores it in the managed buffer 114 to transition the state of the new request 113' stored in the replay buffer 112 to state B.
当新请求113’在主要工作器150a处开始执行时,主要工作器150a识别出新请求113’具有与原始请求113相同的请求标识符rid,并且因此不执行新请求113’,因为它是复制品。When the new request 113' begins execution at the main worker 150a, the main worker 150a recognizes that the new request 113' has the same request identifier rid as the original request 113, and therefore does not execute the new request 113' because it is a copy.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
3.8场景8:ta<tr<tb,执行已完成 3.8 Scenario 8: ta < tr < tb, execution completed
最后,在第八种情况下,正在处理请求的作为主要工作器的工作器丢失(例如,已知故障)。一般而言,对于正在等待丢失的主要工作器来提供响应(即,备用工作器处于状态F)的备用工作器处的任何请求,该备用工作器被提升为主要工作器。当根140例如由于未能从该工作器接收到对消息的应答而检测到工作器丢失时,根启动到时间tr的回滚,该时间等于上次复制的时间(即,tr=T2)。当备用工作器接收了到时间tr的回滚请求时(可能伴随新分区信息以适应丢失的工作器),备用工作器通过将请求的状态更改为其正在等待资源以执行请求的状态A而开始充当新的主要工作器。Finally, in the eighth case, the worker processing the request as the primary worker is lost (e.g., known failure). Generally, for any request at a standby worker that is waiting for the lost primary worker to provide a response (i.e., the standby worker is in state F), that standby worker is promoted to primary worker. When root 140 detects a worker loss, for example due to failure to receive a response to a message from that worker, the root initiates a rollback to time tr, which is equal to the time of the last replication (i.e., tr = T2). When the standby worker receives a rollback request to time tr (possibly accompanied by new partition information to accommodate the lost worker), the standby worker begins to act as the new primary worker by changing the state of the request to state A, where it is waiting for resources to execute the request.
参考图36至图37,示出了第八回滚场景的一个示例。在图36中,在时间ta发布的请求113被存储在调用集群组件110处的重播缓冲器112中并且处于状态A。请求113被存储在主要工作器150a处的易失性存储器155中并且处于状态B,因为该请求已开始执行但尚未完成执行。该请求还存储在备用工作器150b中并且处于状态F。在执行请求113期间,主要工作器150a故障或丢失。Referring to Figures 36 and 37, an example of an eighth rollback scenario is shown. In Figure 36, request 113, issued at time ta, is stored in replay buffer 112 at the calling cluster component 110 and is in state A. Request 113 is stored in volatile memory 155 at the primary worker 150a and is in state B because the request has started execution but has not yet completed. The request is also stored in standby worker 150b and is in state F. During the execution of request 113, the primary worker 150a fails or is lost.
在图37中,根已请求回滚到时间tr,该时间等于上次复制的时间。此时,备用工作器150b被提升为主要工作器150a,并将其状态更改为状态A。另一工作器150c被分派为处于状态F的备用工作器。In Figure 37, the root has requested a rollback to time tr, which is equal to the time of the last replication. At this point, standby worker 150b is promoted to primary worker 150a and its state is changed to state A. Another worker 150c is assigned as a standby worker in state F.
然后,集群根据其正常操作进行(如图5至图12所阐述的)。Then, the cluster proceeds according to its normal operation (as illustrated in Figures 5 to 12).
4根节点4 nodes
现在转向根140的操作,如上所述,根周期性地增加当前工作时间(间隔)T1 144。一般而言,当根更新工作时间时,根向所有工作器分配(例如,广播)时间元组(T1,T2,T3)144-146。作为响应,工作器向根提供信息,根可以基于该信息更新T2和/或T3时间。Now, turning to the operation of root 140, as described above, the root periodically increments the current working time (interval) T1 144. Generally, when the root updates its working time, it distributes (e.g., broadcasts) time tuples (T1, T2, T3) 144-146 to all workers. In response, the workers provide information to the root, which can then update the T2 and/or T3 times based on that information.
每个工作器维护与特定工作时间相关联的一组计数器151-152。一个计数器151与工作时间t1相关联(称为Sent(t1)),该计数器对已从该工作器发送到备用工作器的、具有请求时间t1的请求的通信数量进行计数,并对已发送到备用工作器的具有响应时间t1的响应的数量进行计数。在图4中,在状态A下,针对发送到备用工作器的具有请求时间ta的每个请求来更新Sent(ta),并且对于被发送用于在备用工作器处的复制的在时间tb生成的每个响应,增加Sent(tb)。注意,对于从工作器发送到调用集群组件的消息,Sent()计数器不会增加。另一计数器152Rec(t1)对在工作器处接收到的与时间t1相关联的通信数量进行计数。具体地,当备用工作器在其进入状态F时接收到具有请求时间ta的请求的复制时,备用工作器增加Rec(ta),并且当备用工作器在进入状态G时接收到在时间tb生成的响应的复制时,备用工作器增加Rec(tb)。每个工作器具有其自己的这些计数器的本地副本,针对工作器w表示为Sentw(t)和Recw(t)。应当明显的是,在与时间t1相关联地发送的所有通信也都在其目的地接收到的程度上,所有工作器w的Sentw(t)的汇总之和等于工作器w的Recw(t)的汇总之和。Each worker maintains a set of counters 151-152 associated with a specific work time. One counter 151, associated with work time t1 (called Sent(t1)), counts the number of communications sent from that worker to the standby worker with a request time t1, and the number of responses sent to the standby worker with a response time t1. In Figure 4, under state A, Sent(ta) is updated for each request sent to the standby worker with a request time ta, and Sent(tb) is incremented for each response generated at time tb and sent for replication at the standby worker. Note that the Sent() counter is not incremented for messages sent from the worker to invoke cluster components. Another counter 152, Rec(t1), counts the number of communications received at the worker associated with time t1. Specifically, when a standby worker receives a copy of a request with a requested time ta when it enters state F, the standby worker increments Rec(ta), and when a standby worker receives a copy of a response generated at time tb when it enters state G, the standby worker increments Rec(tb). Each worker has its own local copy of these counters, denoted as Sentw(t) and Recw(t) for worker w. It should be apparent that, to the extent that all communications sent in association with time t1 are also received at their destination, the sum of Sentw(t) for all workers w is equal to the sum of Recw(t) for worker w.
不时地,例如响应于从根140接收到当前时间(T1,T2,T3)的广播,每个工作器150针对大于复制时间T2的所有时间发送其当前计数Sent(t)151和Rec(t)152。这些计数在根处接收并汇总,使得根针对大于T2的每个时间t确定Sent(t)之和以及Rec(t)之和,并将它们与相应时间相关联地存储在计数器141和142中。如果Sent(T2+1)等于Rec(T2+1),则从时间T2+1开始的所有传输都已收到,并且增加T2以成为下一个复制时间。重复此过程,直到Sent(T2+1)不等于Rec(T2+1)或T2+1达到T1为止。然后,此增加的T2时间(145)用于下一次从根开始的广播。Occasionally, for example, in response to a broadcast of the current time (T1, T2, T3) received from root 140, each worker 150 sends its current counts Sent(t) 151 and Rec(t) 152 for all times greater than the replication time T2. These counts are received and summed at the root, such that the root determines the sum of Sent(t) and the sum of Rec(t) for each time t greater than T2 and stores them in counters 141 and 142 in association with the corresponding time. If Sent(T2+1) equals Rec(T2+1), then all transmissions from time T2+1 have been received, and T2 is incremented to become the next replication time. This process is repeated until Sent(T2+1) is not equal to Rec(T2+1) or T2+1 reaches T1. This incremented T2 time (145) is then used for the next broadcast starting from the root.
如上所述,首先在易失性存储器中记录工作器处的数据更新,其中,日志被不时地写入持久性存储装置中。针对复制时间T2之前的更改,每个工作器都可以自由地将持久性存储器中的所记录更改永久化。通常,每个工作器w都已有机会使经过时间T3(w)的所有更改永久化,其中不同的工作器通常已达到了不同的时间。除了响应当前时间的广播而将Rec()和Sent()返回到根之外,每个工作器还返回其T3(w)时间,该时间根据根处或回到根的通信路径上的min()操作进行汇总。即,根确定T3=minw T3(w),并且然后根在下一次分配当前时间时分配T3的此新值。As described above, data updates at the worker's location are first recorded in volatile memory, with logs periodically written to persistent storage. For changes prior to replication time T2, each worker is free to persist the recorded changes in persistent storage. Typically, each worker w has already had the opportunity to persist all changes up to time T3(w), where different workers typically reach different times. In addition to returning Rec() and Sent() to the root in response to the broadcast of the current time, each worker also returns its T3(w) time, which is summed based on the min() operation at the root or on the communication path back to the root. That is, the root determines T3 = minw T3(w), and then the root assigns this new value of T3 the next time the current time is allocated.
在一些实施例中,根在根与每个工作器之间的直接(例如,单播)通信中分配时间元组(T1,T2,T3)。在其他实施例中,元组以另一方式(诸如基于泛洪的广播)分配。在另一实施例中,元组沿着预定的树状结构分发网络分配,其中元组的每个接收者将元组转发给多个其他接收者,使得最终所有工作器都已接收到时间元组。In some embodiments, the root distributes time tuples (T1, T2, T3) in direct (e.g., unicast) communication between the root and each worker. In other embodiments, tuples are distributed in a different manner (such as flood-based broadcasting). In yet another embodiment, tuples are distributed along a predetermined tree-structured distribution network, where each receiver of a tuple forwards it to multiple other receivers, such that eventually all workers have received the time tuples.
来自工作器的计数的汇总可以通过每个工作器与根节点之间的单播通信来执行,其中,根对所有工作器执行完整的求和。作为一种更高效的解决方案,可以沿与时间元组相同的路径将计数发送回去,其中,路径中的中间节点执行计数总和的部分汇总,从而与根一起分担求和的负担,但仍然获得了关于所有工作器的计数总和。The summation of counts from the workers can be performed via unicast communication between each worker and the root node, where the root performs a full summation over all workers. As a more efficient solution, the counts can be sent back along the same path as the time tuple, where intermediate nodes in the path perform partial summations of the counts, thus sharing the burden of summation with the root, but still obtaining a summation of counts from all workers.
在替代性操作模式中,当复制响应时间而不是持久时间时,可以从调用集群组件中释放响应。以这种方式,可以以较小的延迟将响应提供给图,其中存在该响应可能尚未在集群存储中持久化的可能性。In alternative operating modes, responses can be released from the calling cluster component when replicating response time rather than persistence time. In this way, responses can be provided to the graph with minimal latency, even if the response may not yet be persisted in cluster storage.
如上所述,将执行请求的响应存储在版本化数据结构中。在一个这种数据结构中,数据项的每次更新都存储为可单独恢复的版本,并利用与该更新相关联的时间标记该版本。例如,可以至少在概念上针对每个访问密钥将数据结构存储为元组(tb值)的列表,其中,tb是值的更新时间。不同时间的值可以共享子结构或使用其他存储优化。在一些示例中,基于时间之间数据值的编辑来存储这些值。作为一个示例,这些值可以表示为基于树的结构,并且每个版本可以存储为足以用于从先前版本创建下一版本的“前向”增量操作,或者存储为足以用于从当前版本重建先前版本的“后向”增量操作。如上所讨论的,这种版本化数据结构允许在回滚时间之后回滚所有更新。不是维护对数据项的所有更新,而是仅保留相对于更新时间的开始的更新,以便可以完成回滚到任何更新时间的开始。As described above, the response to the execution request is stored in a versioned data structure. In such a data structure, each update to a data item is stored as a separately recoverable version, and that version is tagged with the time associated with that update. For example, the data structure can be stored, at least conceptually, as a list of tuples (tb values) for each access key, where tb is the update time of the value. Values at different times can share substructures or use other storage optimizations. In some examples, these values are stored based on edits to the data values between times. As an example, these values can be represented as a tree-based structure, and each version can be stored as a "forward" incremental operation sufficient to create the next version from a previous version, or as a "backward" incremental operation sufficient to reconstruct a previous version from the current version. As discussed above, this versioned data structure allows all updates to be rolled back after a rollback time. Instead of maintaining all updates to the data item, only updates relative to the beginning of the update time are retained so that a rollback to the beginning of any update time can be completed.
应当认识到,在根增加复制时间T2之后,将不会要求工作器回滚到该时间或其之前的版本。因此,版本化数据结构的优化是可以从数据结构中移除复制时间T2或其之前的版本。It should be recognized that after adding a replication time T2 to the root, the worker will not be required to roll back to that time or a version prior to it. Therefore, an optimization of the versioned data structure is to remove versions with replication time T2 or earlier from the data structure.
在一些实施例中,某些请求在它们的执行时间很小的意义上是“轻量级的”,并且因此,在备用工作器处执行该请求可以比将响应从主要工作器复制到备用工作器消耗更少的资源。在这种实施例中,不执行从主要工作器到(多个)备用工作器的响应复制。每个工作器可以在不同的时间完成处理。为了维护工作器之间的数据同步,主要工作器如上所述分配完成时间tb,而备用工作器则将其本地计算的响应视为这些响就像是在那时计算的。In some embodiments, certain requests are "lightweight" in the sense that their execution time is small, and therefore, executing the request at a standby worker can consume fewer resources than replicating the response from the primary worker to the standby worker. In this embodiment, response replication from the primary worker to the standby worker(s) is not performed. Each worker may complete processing at a different time. To maintain data synchronization between workers, the primary worker allocates a completion time tb as described above, while the standby workers treat their locally computed responses as if those responses were computed at that time.
在替代性实施例中,调用集群组件在其从根接收时间元组并且将Sent()和Rec()计数返回到根的意义上参与集群。在此实施例中,调用集群组件为请求分派请求时间,该时间在请求的复制期间由工作器使用。当发生回滚时,因为调用集群组件知道其保存的请求的请求时间,因此仅需在回滚时间之后重新发送请求,而不会丢弃在回滚时间或其之前生成的响应。修改工作器的操作以适应调用集群组件的此操作。In an alternative embodiment, the invoking cluster component participates in the cluster in the sense that it receives time tuples from the root and returns Sent() and Rec() counts to the root. In this embodiment, the invoking cluster component dispatches request times for requests, which are used by the workers during request replication. When a rollback occurs, because the invoking cluster component knows the request times of the requests it holds, it only needs to resend the request after the rollback time, without discarding responses generated at or before the rollback time. The worker operation is modified to accommodate this operation of the invoking cluster component.
5替代方案5 alternative solutions
更一般地,在上述回滚场景4-8中,其中,ta<tr,当调用集群组件110重传请求时,调用集群组件不知道(也不在乎)原始请求在时间ta被传输。另一方面,集群120需要考虑原始请求的请求时间,因为集群使用该时间来确定是否回滚。因此,当调用集群组件110将请求(具有请求标识符rid)重新发送到集群120使得ta<tr<tc时,该请求在主要工作器150a处接收并与时间tc相关联。主要工作器150a将请求转发到备用工作器150b。在这种情况下,主要工作器可以在其执行重新发送的请求(即,{rid,tc})之前执行原始请求(即,{rid,ta})。当主要工作器150a进行到执行重新发送的请求(即,{rid,tc})时,因为对原始请求(即,{rid,ta})的响应已经持久化,因此主要工作器将重新发送的请求视为复制品。More generally, in the rollback scenarios 4-8 above, where ta < tr, when the calling cluster component 110 retransmits the request, the calling cluster component is unaware (and indifferent) that the original request was transmitted at time ta. On the other hand, cluster 120 needs to consider the request time of the original request, as the cluster uses this time to determine whether to rollback. Therefore, when the calling cluster component 110 retransmits the request (with the request identifier rid) to cluster 120 such that ta < tr < tc, the request is received at the primary worker 150a and associated with time tc. The primary worker 150a forwards the request to the standby worker 150b. In this case, the primary worker can execute the original request (i.e., {rid, ta}) before executing the retransmitted request (i.e., {rid, tc}). When the primary worker 150a proceeds to execute the retransmitted request (i.e., {rid, tc}), because the response to the original request (i.e., {rid, ta}) has been persisted, the primary worker treats the retransmitted request as a copy.
在一些示例中,请求产生后续任务(有时称为‘任务链’)。在此类示例中,直到产生的任务完成之后,才生成对请求的响应。在一些示例中,如果已存储了对请求{rid,ta}的响应,则工作器将其响应返回到调用集群组件。但是,如果因为请求{rid,ta}尚未完成因此对请求{rid,ta}的响应还不存在,则因为集群知道原始请求最终将完成并生成被返回到调用集群组件的响应,因此具有复制品rid的后续请求{rid,tc}被忽略。In some examples, the request spawns subsequent tasks (sometimes called a 'task chain'). In such examples, a response to the request is not generated until after the spawned tasks have completed. In some examples, if a response to the request {rid,ta} has already been stored, the worker returns its response to the calling cluster component. However, if a response to the request {rid,ta} does not yet exist because the request {rid,ta} has not yet completed, the subsequent request {rid,tc} with a copy of rid is ignored because the cluster knows that the original request will eventually complete and generate a response to be returned to the calling cluster component.
在上述示例中,当集群接收到请求时,集群将时间(例如ta)与该请求相关联,并且然后将该时间通知给调用集群组件。调用集群组件将时间与存储在其重播缓冲器中的请求相关联。在回滚的情况下,调用集群组件可以使用与调用集群组件的重播缓冲器中的请求相关联的时间来选择性地重播请求。但是,在一些示例中,集群和调用集群组件都不会将请求与时间相关联。在这些示例中,当在回滚场景的情况下重播请求时,调用集群组件的选择性较低。例如,在回滚请求的情况下,调用集群组件可以系统地重播其重播缓冲器中的所有请求。In the example above, when the cluster receives a request, it associates a time (e.g., ta) with the request and then notifies the calling cluster component of that time. The calling cluster component associates the time with the request stored in its replay buffer. In the case of a rollback, the calling cluster component can selectively replay the request using the time associated with the request in its replay buffer. However, in some examples, neither the cluster nor the calling cluster component associates the request with a time. In these examples, the calling cluster component has less selectivity when replaying requests in a rollback scenario. For example, in the case of a rollback request, the calling cluster component could systematically replay all requests in its replay buffer.
6实施方式6 Implementation Methods
上述计算集群管理方法可以例如使用执行合适的软件指令的可编程计算系统来实施,或者该方法可以在合适的硬件中实施,比如现场可编程门阵列(FPGA)或以某种混合形式。例如,在程控方法中,软件可以包括一个或多个计算机程序中的在一个或多个程控的或可编程计算系统(其可以是各种体系架构,诸如分布式客户端/服务器、或电网)上执行的过程,该计算系统各自包括至少一个处理器、至少一个数据存储系统(包括易失性和/或非易失性存储器和/或存储元件)、至少一个用户接口(用于使用至少一个输入设备或端口接收输入,并且用于使用至少一个输出设备或端口提供输出)。软件可以包括较大程序的一个或多个模块,该较大程序例如提供与对数据流图的设计、配置、和执行有关的服务。程序模块(例如,数据流图的元素)可以被实施为数据结构或符合存储在数据储存库中的数据模型的其他经组织的数据。The aforementioned computing cluster management method can be implemented, for example, using a programmable computing system that executes suitable software instructions, or the method can be implemented in suitable hardware, such as a field-programmable gate array (FPGA) or in some hybrid form. For example, in a programmable method, the software may include processes in one or more computer programs that execute on one or more programmable or controlled computing systems (which can be of various architectures, such as distributed client/server systems or power grids), each including at least one processor, at least one data storage system (including volatile and/or non-volatile memory and/or storage elements), and at least one user interface (for receiving input using at least one input device or port and for providing output using at least one output device or port). The software may include one or more modules of a larger program that, for example, provides services related to the design, configuration, and execution of a data flow graph. Program modules (e.g., elements of a data flow graph) may be implemented as data structures or other organized data conforming to a data model stored in a data repository.
软件可以以非暂态形式存储一段时间(例如,如动态RAM等动态存储器设备的刷新周期之间的时间),诸如使用介质的物理性质(例如,表面凹坑和岸台、磁畴、或电荷)体现在易失性或非易失性存储介质中、或任何其他非暂态介质中。在准备加载指令时,软件可以提供在如CD-ROM或其他计算机可读介质(例如,由通用或专用计算系统或设备可读)等有形、非暂态介质上,或者可以通过网络的通信介质递送(例如,被编码到传播信号中)到其被执行的计算系统的有形、非暂态介质。该处理的一些或全部可在专用计算机上执行、或使用专用硬件(诸如协处理器或现场可编程门阵列(FPGA)或专用的专用集成电路(ASIC))来执行。该处理可以以分布式方式来实施,其中由软件指定的计算的不同部分由不同的计算元件执行。每一个这种计算机程序优选地存储在或下载到可由通用或专用可编程计算机访问的存储设备的计算机可读存储介质(例如,固态存储器或介质、或磁性介质或光学介质)上,以便当由计算机读取存储设备介质以执行本文中描述的处理时,对计算机进行配置和操作。也可认为本发明的系统被实施为配置有计算机程序的有形、非暂态介质,其中,如此配置的介质致使计算机以指定的且预定义的方式操作以便执行本文中描述的处理步骤中的一项或多项。Software can be stored in a non-transitory form for a period of time (e.g., the time between refresh cycles of a dynamic memory device such as dynamic RAM), such as in volatile or non-volatile storage media embodied by the physical properties of the medium (e.g., surface pits and shores, magnetic domains, or charges), or any other non-transitory medium. When preparing to load instructions, the software can be provided on a tangible, non-transitory medium such as a CD-ROM or other computer-readable medium (e.g., readable by a general-purpose or special-purpose computing system or device), or delivered via a communication medium over a network (e.g., encoded into a propagating signal) to the tangible, non-transitory medium of the computing system to which it is executed. Some or all of the processing can be executed on a dedicated computer or using dedicated hardware such as a coprocessor or field-programmable gate array (FPGA) or dedicated application-specific integrated circuit (ASIC). The processing can be implemented in a distributed manner, where different parts of the computation specified by the software are executed by different computing elements. Each such computer program is preferably stored or downloaded to a computer-readable storage medium (e.g., solid-state memory or medium, or magnetic or optical medium) accessible by a general-purpose or special-purpose programmable computer, so as to configure and operate the computer when the storage medium is read by the computer to perform the processes described herein. The system of the present invention may also be considered as a tangible, non-transitory medium configured with a computer program, wherein such a medium causes the computer to operate in a specified and predefined manner to perform one or more of the processing steps described herein.
已经描述了本发明的多个实施例。然而,应当理解,前述描述旨在说明而非限制本发明的范围,本发明的范围由所附权利要求书的范围限定。因此,其他实施例也在所附权利要求书的范围内。例如,在不背离本发明的范围的情况下,可进行各种修改。另外,上述步骤中的一些可以是顺序独立的,并且因此可以以与所描述的顺序不同的顺序来执行。Several embodiments of the invention have been described. However, it should be understood that the foregoing description is intended to illustrate, not limit, the scope of the invention, which is defined by the appended claims. Therefore, other embodiments are also within the scope of the appended claims. For example, various modifications may be made without departing from the scope of the invention. Furthermore, some of the steps described above may be sequentially independent and therefore may be performed in a different order than that described.
Claims (14)
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| US62/579,225 | 2017-10-31 |
Publications (2)
| Publication Number | Publication Date |
|---|---|
| HK40030421A HK40030421A (en) | 2021-02-26 |
| HK40030421B true HK40030421B (en) | 2024-03-22 |
Family
ID=
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| CN111373373B (en) | Using interval counters to manage computing clusters | |
| HK40030421B (en) | Managing a computing cluster using time interval counters | |
| HK40029746B (en) | Managing a computing cluster using durability level indicators | |
| HK40030396B (en) | Managing a computing cluster based on consistency of state updates | |
| HK40031469B (en) | Managing a computing cluster interface | |
| HK40030366B (en) | Managing a computing cluster using replicated task results | |
| HK40030396A (en) | Managing a computing cluster based on consistency of state updates | |
| HK40031469A (en) | Managing a computing cluster interface | |
| HK40030366A (en) | Managing a computing cluster using replicated task results | |
| HK40030421A (en) | Managing a computing cluster using time interval counters | |
| HK40029746A (en) | Managing a computing cluster using durability level indicators |