WO2019086120A1 - Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence - Google Patents
Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence Download PDFInfo
- Publication number
- WO2019086120A1 WO2019086120A1 PCT/EP2017/078153 EP2017078153W WO2019086120A1 WO 2019086120 A1 WO2019086120 A1 WO 2019086120A1 EP 2017078153 W EP2017078153 W EP 2017078153W WO 2019086120 A1 WO2019086120 A1 WO 2019086120A1
- Authority
- WO
- WIPO (PCT)
- Prior art keywords
- computing
- round
- nodes
- task
- sub
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Ceased
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
- G06F9/4881—Scheduling strategies for dispatcher, e.g. round robin, multi-level priority queues
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
- G06F9/485—Task life-cycle, e.g. stopping, restarting, resuming execution
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5027—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
- G06F9/5038—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals considering the execution order of a plurality of tasks, e.g. taking priority or time dependency constraints into consideration
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5027—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
- G06F9/5044—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals considering hardware capabilities
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5061—Partitioning or combining of resources
- G06F9/5072—Grid computing
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5061—Partitioning or combining of resources
- G06F9/5077—Logical partitioning of resources; Management or configuration of virtualized resources
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
Definitions
- the present invention relates to the field of computing and data processing. More specifically, the present invention relates to a system and method for high- performance general-purpose parallel computing with fault tolerance and tail tolerance.
- Distributed computing systems, high performance computing systems, and other similar systems may facilitate scientists and engineers to solve complex science, engineering, and business problems using applications that benefit from high bandwidth, low latency networking, and very high compute capabilities. These systems may also execute data storage and retrieval, perform more straightforward tasks, and the like. Such systems may include those for cloud computing, Big Data analytics, web services, enterprise services, distributed computing and the like.
- the competitive business of data and computing services drives manufacturers in the continuous improvement of their processes and products in order to lower costs, deliver reliable service, increase speed and the like. Indeed, in data handling and processing, there is generally an ever-increasing demand to utilize processing resources more efficiently.
- large-scale parallel computing systems should meet preferably all of the following requirements: can be used to efficiently run any parallel computation, at any scale; can be used on cost- effective commodity architectures; can be used to efficiently run computations continuously, without interruptions; offer high performance; offer high availability, with automatic fault tolerance and tail tolerance, self-healing and self-optimizing.
- Cloud Computing offers parallelism, scale and cost-effectiveness, but clouds are very unpredictable.
- the maximum latency is often more than 100 times greater than the average latency for identical tasks. In some cases, the factor can even be 1000 times or more.
- Figure 1 shows an exemplary distribution of the number of processes over the expected latency for a conventional distributed computing system.
- containers can be relaunched very quickly - in seconds rather than the minutes normally required to relaunch a virtual machine or a physical server. So containers provide a means of restarting computations quickly, but there remains the challenge of deciding when to restart a computing node.
- the focus of the AGSS paper is on simple data parallel, constant-round parallel computations, e.g. MapReduce/Hadoop.
- the AGSS paper considers computations that have a small number of tasks, e.g. at most 10, and execute in a small number of rounds, e.g. 1-3.
- General purpose parallel computations that may have thousands of tasks, and may execute for thousands of rounds are not considered in the AGSS paper.
- the AGSS paper does not address the challenge of supporting long or continuous parallel computations that need to run nonstop over a huge number of rounds.
- the principal concern of the AGSS paper is to optimize data movement in disk-based data communications systems such as Hadoop HDFS.
- General purpose parallel computing with high performance point-to-point data communications systems such as BSP, MPI, RDMA is not considered in the AGSS paper.
- embodiments of the invention provide systems and methods for high- performance general-purpose parallel computing with fault tolerance and tail tolerance.
- Embodiments of the invention provide systems for handling high-latency nodes for all large-scale parallel computations, including those that are general dataflow graphs, or communication-intensive, or highly iterative, multi-round.
- Embodiments of the invention provide distributed computing systems with scalability, high performance, cost- effectiveness, and high availability (automatic fault tolerance and tail tolerance).
- Embodiments of the invention provide a cost model and automatic optimization for general purpose parallel computations run on large-scale cloud and other commodity architectures where faults and long latency tails are common.
- the invention relates to a distributed computing system comprising: a plurality of computing nodes, wherein each computing node is configured to simultaneously execute a respective sub-task of a parallel computing task in a plurality of computing rounds; and a communication network configured to allow data exchange between the plurality of computing nodes.
- Each computing round comprises an execution stage, i.e. processing of local data at each computing node, a communication stage between the plurality of computing nodes and a synchronization stage between the plurality of computing nodes.
- the distributed computing system is configured to handle one or more high-latency computing nodes of the plurality of computing nodes in each computing round.
- a high-latency computing node is a computing node that takes much longer to execute a task than other similar nodes take to execute a task of the same computational cost.
- an improved general purpose parallel computing system is provided that can handle high-latency computing nodes.
- the plurality of computing nodes are configured to simultaneously execute copies of a sub-task of the parallel computing task at the same time.
- this is referred to as horizontal cloning.
- the number of computing nodes is a multiple of the number of sub-tasks allowing multiple copies of any sub-task to be run in parallel in the same computing round.
- the multiple can be an integer or a rationale multiple.
- the number of computing nodes is at least twice as large as the number of sub-tasks, allowing multiple copies of any sub-task to be run in parallel in the same round.
- one or more of the plurality of computing nodes is configured to execute at least two sub-tasks of the parallel computing task in sequence during a computing round.
- this is referred to as vertical cloning.
- the plurality of computing nodes are configured to execute each of the sub-tasks of the parallel computing task in a round first on at least one of the plurality of computing nodes.
- the distributed computing system is further configured to identify a respective computing node of the plurality of computing nodes as a high-latency computing node in a computing round, if the duration of the execution stage of the respective computing node is larger than a duration threshold for the computing round.
- each of the plurality of computing nodes is configured to determine the duration threshold for a computing round on the basis of a minimum duration of the round, wherein the minimum duration of the round is defined by the duration of the execution stage of the computing node having the shortest execution stage of the computing round.
- each computing node is associated with a respective synchronization parameter indicating whether or not the respective computing node can define the minimum duration for the computing round.
- the distributed computing system further comprises a plurality of standby computing nodes, wherein each of the plurality of standby computing nodes is configured to start executing a sub-task of a specific high-latency computing node or any high-latency computing node of the plurality of computing nodes in a computing round.
- each computing node comprises a physical computer, a virtual machine, or a software container.
- the invention relates to a corresponding distributed computing method comprising the following steps: executing a respective sub-task of a parallel computing task in a plurality of computing rounds by a plurality of computing nodes, wherein each computing round comprises an execution stage, i.e. processing of local data at each computing node, a communication stage between the plurality of computing nodes and a synchronization stage between the plurality of computing nodes; and handling one or more high-latency computing nodes of the plurality of computing nodes in each computing round.
- the step of executing comprises simultaneously executing copies of a sub-task of the parallel computing task at the same time.
- the number of computing nodes is a multiple of the number of sub-tasks allowing multiple copies of any sub-task to be run in parallel in the same computing round.
- the multiple can be an integer or a rationale multiple.
- the number of computing nodes is at least twice as large as the number of sub-tasks, allowing multiple copies of any sub-task to be run in parallel in the same round.
- the step of executing comprises executing at least two sub-tasks of the parallel computing task in sequence during a round.
- the step of executing comprises executing each of the sub-tasks of the parallel computing task in a round first on at least one of the plurality of computing nodes.
- the method comprises the further step of identifying a respective computing node of the plurality of computing nodes as a high-latency computing node in a computing round, if the duration of the execution stage of the respective computing node is larger than a duration threshold for the computing round.
- the method comprises the further step of determining the duration threshold for a computing round on the basis of a minimum duration of the computing round, wherein the minimum duration of the computing round is defined by the duration of the execution stage of the computing node having the shortest execution stage of the computing round.
- each computing node is associated with a respective synchronization parameter indicating whether or not the respective computing node can define the minimum duration for the computing round.
- the method comprises the further step of starting to execute a sub-task of a specific high-latency computing node or any high-latency computing node of the plurality of computing nodes in a computing round.
- each computing node comprises a physical computer, a virtual machine, or a software container.
- the distributed computing method according to the second aspect of the invention can be performed by the distributed computing system according to the first aspect of the invention. Further features of the distributed computing method according to the second aspect of the invention result directly from the functionality of the distributed computing system according to the first aspect of the invention and its different implementation forms described above and below. According to a third aspect the invention relates to a computer program product comprising program code for performing the distributed computing method according to the second aspect when executed on a computer or a processor.
- the invention can be implemented in hardware and/or software.
- Fig. 1 shows a schematic diagram illustrating an exemplary distribution of the number of computing tasks over the expected latency for a conventional distributed computing system
- Fig. 2 shows a schematic diagram illustrating a distributed computing system with a plurality of computing nodes according to an embodiment
- Fig. 3 shows a flow chart illustrating steps of a distributed computing method according to an embodiment
- Fig. 4 shows a flow chart illustrating the standard execution procedure for a conventional BSP program with no fault or tail tolerance
- Fig. 5a shows a schematic diagram illustrating an example of a non-load-balanced BSP program
- Fig. 5b shows a schematic diagram illustrating the example of figure 5 with a distributed computing system according to an embodiment
- Fig. 6 shows a flowchart illustrating steps for executing a nonstop BSP program by a distributed computing system according to an embodiment
- Fig. 7 shows a schematic diagram illustrating setting and sharing a minimum duration for a computing round in a distributed computing system according to an embodiment
- Fig. 8 shows a flow chart illustrating a tail limit interrupt procedure implemented in a distributed computing system according to an embodiment
- Fig. 9 shows a flow chart illustrating the completion of a computing round at the tail limit as implemented in a distributed computing system according to an embodiment
- Fig. 10a shows a schematic diagram illustrating the amount of time taken by a plurality of processes in an exemplary conventional distributed computing system
- Fig. 10b shows a schematic diagram illustrating the amount of time taken by the plurality of processes of figure 10a in a distributed computing system according to an embodiment implementing vertical cloning
- Fig. 10c shows a schematic diagram illustrating a further example of vertical cloning implemented in a distributed computing system according to an embodiment
- Fig. 1 1 a shows a schematic diagram illustrating the amount of time taken by a plurality of processes in a distributed computing system according to an embodiment implementing horizontal cloning
- Fig. 1 1 b shows a schematic diagram illustrating a further example of horizontal cloning implemented in a distributed computing system according to an embodiment
- Fig. 1 1 c shows a schematic diagram illustrating a plurality of processes executed by a distributed computing system according to an embodiment implementing both vertical cloning and horizontal cloning;
- Fig. 1 1d shows a schematic diagram illustrating a further example for a combination of vertical and horizontal cloning implemented in a distributed computing system according to an embodiment
- Fig. 12a shows a schematic diagram illustrating an example for a plurality of parallel processes in a conventional distributed computing system with disjoint redundancy
- Figs. 12b and 12c show schematic diagrams illustrating an example for a plurality of parallel processes in a distributed computing system according to an embodiment providing fault tolerance using combined redundancy
- Fig. 13a shows a schematic diagram illustrating the amount of time taken by a plurality of processes over several computing rounds in an exemplary conventional distributed computing system
- Fig. 13b shows a schematic diagram illustrating the amount of time taken by the plurality of processes of figure 13a in a distributed computing system according to an embodiment
- Fig. 14 shows a schematic diagram illustrating an exemplary execution of a plurality of processes by a distributed computing system according to an embodiment
- Fig. 15 shows a schematic diagram illustrating a further exemplary execution of a plurality of processes by a distributed computing system according to an embodiment.
- identical reference signs will be used for identical or functionally equivalent features.
- FIG. 2 shows a distributed computing system 200 according to an embodiment.
- the distributed computing system 200 comprises a plurality of computing nodes 201 and a communication network 203 configured to allow data exchange between the plurality of computing nodes 201.
- the communication network 203 can comprise a plurality of wired and/or wireless connections between the computing nodes 201.
- the communication network 203 can comprise an Ethernet communication network.
- the distributed computing system 200 is a large-scale general purpose parallel computing system 200.
- the distributed computing system 200 can be, for instance, a distributed memory supercomputer cluster or a large-scale network of servers in a datacenter.
- each computing node 201 can comprise a processing unit 201 a and a local memory 201 b.
- non-local memory references can be handled by inter-node communications across the communication network 203.
- Each computing node 201 is configured to simultaneously execute a respective sub-task of a parallel computing task in a plurality of computing rounds.
- Each computing round comprises an execution stage, a communication stage between the plurality of computing nodes 201 and a synchronization stage between the plurality of computing nodes 201.
- the distributed computing system 200 is configured to handle one or more high-latency computing nodes of the plurality of computing nodes 201 in each computing round.
- the distributed computing system 200 is configured to execute large-scale parallel algorithms by "computing in rounds". Such algorithms could be written in software such as MPI, BSP, MapReduce, Spark, Pregel, Giraph, Petuum, or other parallel programming models and systems.
- This style of parallel computing in rounds is normally referred to as Bulk Synchronous Parallel (BSP) computing.
- BSP style parallel algorithms and software the basic computational model comprises the following stages at each computing node 201 : (i) compute on data in local memory 201 b; (ii) globally communicate across the communication network 203; (iii) synchronize; and (iv) repeat.
- Simple parallel models such as MapReduce provide an adequate framework for parallel computations that involve only a small number of rounds.
- Other models such as Spark provide an adequate framework for parallel computations of limited scale, where the low performance obtained by automatic management of communications is acceptable.
- BSP style parallelism either using MPI or BSP message passing software, has proven to be capable of delivering the highest levels of performance in all kinds of applications, including Dense Linear Algebra, Sparse Linear Algebra, Spectral Methods (e.g. FFT), N-Body Methods, Structured Grids, Unstructured Grids, Monte Carlo Simulations, Graph Computing, Dynamic Programming, Combinatorial Search (e.g.
- the distributing computing system 200 is configured to implement a BSP scheme for "computing in rounds".
- BSP scheme for "computing in rounds".
- Many of the most important modern large-scale commercial parallel applications such as machine learning, deep learning, Al, network optimization, and graph analytics, are highly iterative, involving thousands of computing rounds, and can be very naturally and easily expressed as BSP computations.
- Figure 3 shows a flow chart illustrating steps of a distributed computing method 300 according to an embodiment.
- the method 300 comprises the steps of: executing 301 a respective sub-task of a parallel computing task in a plurality of computing rounds by the plurality of computing nodes 201 , wherein each computing round comprises an execution stage, a communication stage between the plurality of computing nodes 201 and a synchronization stage between the plurality of computing nodes 201 ; and handling 303 one or more high-latency computing nodes of the plurality of computing nodes 201 in each computing round.
- the distributed computing system 200 could be implemented as a computing system in the cloud, herein also referred to as "Supercloud”.
- Supercloud a computing system in the cloud
- the term is influenced by the many challenges that are faced in the specific area of cloud computing, it is not intended to imply that the invention applies only to cloud computing.
- embodiments of the invention are applicable to any large scale parallel computing system such as a distributed memory supercomputer cluster or a large-scale network of servers in a datacenter.
- Embodiments of the invention provide a solution not only for cloud and centralized datacenter computing, but also a solution to the resilience, latency and performance requirements of such mobile and edge computing.
- the computing nodes 201 can be a physical server or virtualized using virtual machines or software containers. Each computing node 201 is configured to run one or more software processes (herein referred to as sub-tasks) during a round of computation. For example, in an embodiment a distributed computing system 200 with 12 computing nodes can execute a parallel program with 48 parallel processes, i.e. sub-tasks by allocating 4 processes, i.e. sub-tasks to each computing node 201.
- the execution model implemented in the distributed computing system 200 can be considered as a nonstop BSP scheme.
- this nonstop BSP scheme has four core features:
- Each process comprises a BSP synchronization mechanism at the end of each round.
- this synchronization mechanism can be parameterized by a (typically Boolean) synchronization parameter or flag indicating whether or not the process/node is one that can set the minimum time value for the round.
- the synchronization parameter can be changed dynamically during the execution of a program. If a program has the synchronization parameter set so that no process/node can set the minimum time value in any round, then the distributed computing system 200 can execute the program as a normal BSP program without fault tolerance or tail tolerance.
- figure 4 shows a flow chart illustrating the standard execution procedure for a conventional BSP program as described above (with no tail or fault tolerance).
- the standard execution procedure for a conventional BSP program comprises an execution and communication stage (see block 401 in figure 4) and a synchronization stage (see block 403 in figure 4). If all local operations and communications for all processes are completed, the next computing round can be started (see block 405 in figure 4).
- Figure 5a shows a simple example of a non-load-balanced conventional BSP program with four processes, where, by way of example, there is an expected time per round of 4 seconds for the processes, i.e. sub-tasks P0, P1 and P2 but only 100ms for the process, i.e. sub-task P3.
- Figure 5b shows a corresponding nonstop BSP program for execution by the distributed computing system 200 according to an embodiment.
- the distributed computing system 200 implements a parameterization of the synchronization primitives in order to show whether or not the associated process is one that can set the minimum time value for the computing round. So, for the example shown in figure 5b, the distributed computing system 200 according to an embodiment can set the synchronization parameter to "False" for the process P3, and to "True" for the other processes.
- the following provides a simple high level overview of the distributed computing system
- E computing nodes or processing elements e.g. CPUs
- P ⁇ E
- Each of the E processing nodes 201 can run one or more of the processes/sub-tasks during a single round.
- the system 200 is configured to run multiple instances of a sub-task, which are referred to as clones.
- the system 200 is configured to run each of the P processes/sub-tasks as the first process/sub-task on at least one of the E computing nodes 201.
- MinTime a minimum duration value for the computing round.
- the first sub-task to be run on each computing node 201 has a synchronization parameter set indicating that it can set MinTime, then it monitors its elapsed time for the current round (Nonstop BSP) and attempts to write MinTime when it ends. The first such process to write its time sets MinTime for the computing round.
- each sub-task can have access not only to its own local data and state, but also to other information including: a copy of the minimum duration, i.e. MinTime for the computation round; its elapsed time for the round, which clones of other sub-tasks it may need to communicate with; standby resources available; where other clones of the sub-task are located.
- the distributed computing system 200 can be described by a polynomial Predictability exponent D, indicating that an expected fraction of the computing nodes 201 E/t D will fail to complete any round in less than t*MinTime.
- the computation performed by the distributed computing system 200 has a TailLimit T.
- any sub-task that fails to complete before T * MinTime is marked as a fault/tail, others can be marked as live.
- a computing round is successful if at least one clone of each of the P sub-tasks completes.
- the inter-process communications of the plurality of communication nodes 201 can be handled in several ways.
- the first clone of a process/sub- task to complete can handle the global communications for all the clones of that process.
- a number of other variations are also possible, depending on other objectives such as balancing communications at endpoints, increasing network latency resilience and other factors.
- process faults and tails can be handled by transferring state from another live clone of the same process.
- a pool of standby computing nodes 202 is maintained that are ready to run (an exemplary standby computing node 202 is shown in figure 2). This can be done in several ways. For example, by having a static pool of standby computing nodes 202 directly associated with the various processes/sub-tasks, or by having a dynamic pool of standby computing nodes 202 each of which can be used with any process/sub-task.
- Figures 6 shows a flowchart illustrating steps for executing a nonstop BSP program by the distributed computing system 200 according to an embodiment.
- a first process/sub-task is executed by a current computing node 201 and communications are initiated, unless another clone of the first process/sub-task has already been completed by another computing node 201.
- the current computing node 201 notifies other clones, i.e. computing nodes 201 executing the same process/sub-task about the completion of the process by the current computing node 201.
- the current computing node 201 is configured to check whether vertical cloning is implemented, i.e. whether the current computing node 201 is supposed to execute a further process/sub-task, which was originally assigned to and still being executed by a different computing node 201.
- Figure 7 shows a schematic diagram illustrating setting and sharing the minimum duration, i.e. MinTime of a computing round between computing nodes 201 in the distributed computing system 200 according to an embodiment.
- Figure 8 shows a flow chart illustrating a tail limit interrupt procedure implemented in the distributed computing system 200 according to an embodiment.
- a first step 801 it is check whether the computing round is not complete yet and whether the elapsed time is larger than the minimum duration or a multiple thereof, e.g. T * MinTime. If this is the case, the respective computing node 201 is a high-latency computing node 201 and will send Tail Limit interrupts in a step 803 to the other computing nodes 201.
- Figure 9 shows a flow chart illustrating the completion of a computing round at the tail limit as implemented in a current processing node 201 of the distributed computing system 200 according to an embodiment.
- a first step 901 it is checked whether a first process/sub- task is still being executed by the current computing node 201 and has not been completed by another computing node 201 yet. If this is the case, computation must stop, as there is no completed copy of that sub-task. If nonstop computation is required then the degree of horizontal cloning, the degree of vertical cloning, and the tail limit T should all be set to ensure that this does not happen.
- a step 903 it is checked whether the first process/sub-task is still being executed by the current computing node 201 and has already been completed by another computing node 201. If this is the case, the current computing node 201 can be relaunched and/or a standby computing node 202 can be used to replace it.
- the state(s) for all incomplete vertical clones can be transferred.
- embodiments of the distributed computing system 200 provide vertical cloning and/or horizontal cloning of processes/sub-tasks.
- each of the E computing nodes 201 can run one or more of the processes/sub-tasks during a single round.
- Each of the P processes/sub-tasks is run as the first process on at least one of the E computing nodes 201.
- P 10 with the processes/sub-tasks numbered from 0 to 9.
- every process/sub-tasks takes exactly the same time as all the others, in every round.
- no faults or tails it is expected that all processes/sub-tasks will complete the round at the same time. If, however, there are faults or high-latency tails then the processes/sub-tasks may take quite different amounts of time, as illustrated in figure 10a, where, by way of example, the process/sub-task "6" takes the longest time.
- Figure 10b illustrates how the example of figure 10a can be handled by the distributed computing system 200 according to an embodiment on the basis of vertical cloning, i.e. multiple process/sub-task instances running consecutively on the same computing node 201.
- the respective computing node 201 having completed the process/sub- task "0" starts executing the process/sub-task "1 ", because the other processing node 201 , which was assigned to execute the process/sub-task "1 " in the first place, has not finished yet.
- each process/sub-task appears twice, but vertical cloning can be used with any multiple. The multiples do not even need to be uniform, although this may normally be the case.
- vertical cloning can be implemented by the distributed computing system 200 as illustrated in figure 10c.
- the degree of vertical cloning can be defined by a parameter VC.
- VC 4.
- only the first row of processes/sub-tasks can set MinTime for the round, and can do so only if their synchronization parameter is set accordingly.
- Figure 1 1 a illustrates how high-latency computing nodes 201 can be handled by the distributed computing system 200 according to an embodiment on the basis of horizontal cloning, i.e.
- the values of HC are 2 and 3.5, respectively.
- the distributed computing system 200 can be configured to combine vertical and horizontal cloning for handling high-latency and/or faulty computing nodes 201.
- only the first row of processes/sub-tasks can set MinTime for the round, and can do so only if their synchronization parameter is set accordingly.
- the distributed computing system 200 can increase the resilience of large-scale parallel computing architectures by providing fault tolerance.
- This advantageous effect provided by horizontal cloning as implemented in the distributed computing system 200 according to an embodiment will be described in the following in the context of figures 12a-c, which shows an example with 10 sub-tasks.
- Figure 12a shows the conventional fault tolerance approach of running two separate disjoint copies of the 10-process computation (which will be referred to as Left and Right), with no significant communication between them.
- Embodiments of the invention have the capability to run both computations together as a single combined computation, executing in rounds, with communications possible between any of the 2P processes, and with automatic fault tolerance and tail tolerance. The cost will be approximately the same as the conventional disjoint redundant computation, but much more resilient.
- FIG 12b shows the example of figure 12a as implemented in the distributed computing system 200 according to an embodiment.
- process clones 0,3,4,6 from the left group and process clones 1 ,5,8,9 from the right group all experience a fault or long tail during the same round, then the computation can still continue without interruption (as shown in figure 12c).
- the probability of the 2p- process system surviving f failures is and in the computation performed by the distributed computing system 200 according to an embodiment, the probability is
- the computation performed by the distributed computing system 200 is much more reliable than the disjoint computation as a function of f.
- the conventional disjoint computation survives with probability close to 0.5
- the computation performed by the distributed computing system 200 survives with probability 0.9995.
- the conventional disjoint computation survives with probability close to 0.5
- the computation performed by the distributed computing system 200 survives with probability 0.99995.
- Embodiments of the invention can be easily extended to higher levels of replication. If, instead of 2x replication a rx replication is used, then the probability of a computation performed by the distributed computing system 200 surviving f faults or long tails, is given by the following formula where
- N(r, p, f) 0 if p ⁇ 1 or
- c is the horizontal clone level HC
- p is the parallelism
- f is the number of failures.
- Embodiments of the invention have an advantageous new cost model that enables optimal parameters to be calculated to achieve optimal performance, ensure nonstop resilience, and to allow tuning to achieve the best possible trade-offs between performance, cost and resilience.
- Program Parameters Parallelism P, Rounds R; Network Parameters: Network Throughput g, Network Latency L; Memory Parameter: Local Memory Size M; Parameters of the distributed computing system 200 according to an embodiment: Predictability D, TailLimit T, VerticalCloneLevel VC, HorizontalCloneLevel HC, StandbyLevel S.
- Cost ⁇ T*HC*Perfect where Perfect is the cost for an idealized system where D is infinite (no failures ever, no tails ever).
- This cost model can be placed at the top level of a hierarchy of cost models for different models of parallel computing, including PRAM (idealized shared memory), MapReduce, and standard BSP with no fault tolerance or tail tolerance.
- the parameters P, R and data distribution can be chosen by the programmer.
- the parameters g, L, M, D are parameters determined by the infrastructure (hardware+software).
- optimal T, VC, HC, S parameters can be automatically calculated to guarantee a given Quality of Service Level (performance and resilience).
- the distributed computing system 200 is also easy to use.
- a Boolean synchronization parameter can be added to sync, which can be made always true for a load balanced program.
- any large- scale parallel software can be automatically optimized for the distributed computing system 200 according to an embodiment. Given program Parallelism P and Rounds R, and the Predictability D of the distributed computing system 200, the optimal values of TailLimit, VerticalClone level, HorizontalClone level, StandbyLevel can be automatically generated.
- Figure 13a shows a schematic diagram illustrating the amount of time taken by a plurality of processes over several computing rounds in an exemplary conventional distributed computing system.
- figure 13b shows a schematic diagram illustrating the amount of time taken by the plurality of processes of figure 13a in the distributed computing system 200 according to an embodiment.
- the distributed computing system 200 provides a substantially improved performance.
- the distributed computing system 200 allows running nonstop BSP programs having a degree of Parallelism P.
- a program with integer Parallelism P>0 consists of P processes that compute together in rounds.
- the number of Rounds R>0 may be finite or infinite.
- the processes perform local computation, global communication and barrier synchronization.
- the distributed computing system 200 handles faults and tails.
- the synchronization mechanism at the end of each round can be parameterized by a value that indicates whether that particular process can or cannot set the minimum time for the round. For example, it can be specified by a Boolean value, true or false.
- the first processes on each computing node 201 monitor their elapsed time for that round.
- the synchronization parameter can be simply set to true for all processes in every round.
- the number of computing nodes 201 can be significantly greater than the number of processes/sub-tasks (e.g., where horizontal cloning is used).
- Processes can be cloned vertically or horizontally or both.
- the clones of the processes can be run together within a combined computation, with communication between the process clones as required by the computation in each round.
- the first process clones on each computing node 201 monitor their elapsed time for the current round.
- the earliest first process clone to finish a round with a synchronization parameter set true, can set MinTime for the round. Any timing error due to race conditions will typically be negligible compared to the round time.
- a simple model of predictability is to have a polynomial predictability function defined by a Predictability exponent D, indicating that in any round, a fraction 1/t D of the E computing nodes 201 will fail to complete the round in time less than t * MinTime, i.e. at t * MinTime one will have E/t D first process faults/tails, for any t ⁇ 1.
- D Predictability exponent
- each round has a TailLimit T>1.
- Any first process clone that fails to complete before T * MinTime is marked as a fault/tail.
- the inter-process communications of the program can be handled by the distributed computing system 200 according to an embodiment.
- the first clone of a process to complete can handle the global communications for all the clones of that process.
- a number of other variations are also possible, depending on other objectives such as balancing communications at endpoints, increasing network latency resilience and other factors.
- a pool of standby computing nodes e.g. containers 201 is maintained that are ready to run.
- This can be done in several ways. For example, by having a static pool of computing nodes 201 directly associated with the various processes, or by having a dynamic pool of computing nodes 201 each of which can be used with any process.
- the choice between having a static pool, a dynamic pool, or no pool of standby nodes 201 can be made based on a tradeoff between speed of relaunch and efficiency of container utilization.
- the required standby pool size can be estimated based on expected number of standbys needed in each round. Writes need to be delayed until all complete or MaxTime reached.
- standby pools offer a range of tradeoffs between cost and speed.
- uniform horizontal cloning with Horizontal Clone level HC
- StandByLevel S a Static Pool of standby nodes 202 with StandByLevel S.
- the first of a process's clones to finish writes to the standby node 202 for the process. If HC-1 complete before MaxTime then the tail clone is relaunched as a new standby. If all HC complete then the standby continues as standby.
- FIG 15. A further example for a computation performed by the distributed computing system 200 according to an embodiment is shown in figure 15.
- the synchronization parameter is set false for pi ,Ci ,Si , and set true for all others.
- some of the parameters described above can be chosen in one of the following ways.
- the distributed computing system 200 would operate as a conventional system.
- the subset of computing nodes 201 that have synchronization parameter true may vary, under program control.
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Mathematical Physics (AREA)
- Hardware Redundancy (AREA)
Abstract
L'invention porte sur un système de calcul distribué (200) et sur un procédé. Le système de calcul distribué (200) comprend une pluralité de nœuds de calcul (201), chaque nœud de calcul (201) étant configuré pour exécuter simultanément une sous-tâche respective d'une tâche de calcul parallèle dans une pluralité de cycles de calcul, et un réseau de communication (203) configuré pour permettre un échange de données entre la pluralité de nœuds de calcul (201). Chaque cycle de calcul comporte une phase d'exécution, une phase de communication entre la pluralité de nœuds de calcul (201) et une phase de synchronisation entre la pluralité de nœuds de calcul (201), le système de calcul distribué (200) étant configuré pour gérer un ou plusieurs nœuds de calcul qui ont un temps de latence important dans la pluralité de nœuds de calcul (201) à chaque cycle de calcul.
Priority Applications (2)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| EP17800443.8A EP3701374A1 (fr) | 2017-11-03 | 2017-11-03 | Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence |
| PCT/EP2017/078153 WO2019086120A1 (fr) | 2017-11-03 | 2017-11-03 | Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| PCT/EP2017/078153 WO2019086120A1 (fr) | 2017-11-03 | 2017-11-03 | Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| WO2019086120A1 true WO2019086120A1 (fr) | 2019-05-09 |
Family
ID=60382174
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| PCT/EP2017/078153 Ceased WO2019086120A1 (fr) | 2017-11-03 | 2017-11-03 | Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence |
Country Status (2)
| Country | Link |
|---|---|
| EP (1) | EP3701374A1 (fr) |
| WO (1) | WO2019086120A1 (fr) |
Cited By (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN111931949A (zh) * | 2019-05-13 | 2020-11-13 | 国际商业机器公司 | 联邦学习环境中的通信 |
| WO2023115272A1 (fr) * | 2021-12-20 | 2023-06-29 | 华为技术有限公司 | Appareil de gestion de puce et procédé associé |
| US12073262B2 (en) | 2020-07-14 | 2024-08-27 | Graphcore Limited | Barrier synchronization between host and accelerator over network |
Citations (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20140181831A1 (en) * | 2012-12-20 | 2014-06-26 | Thomson Licensing | DEVICE AND METHOD FOR OPTIMIZATION OF DATA PROCESSING IN A MapReduce FRAMEWORK |
| US20140201564A1 (en) * | 2013-01-15 | 2014-07-17 | Microsoft Corporation | Healing cloud services during upgrades |
-
2017
- 2017-11-03 EP EP17800443.8A patent/EP3701374A1/fr not_active Ceased
- 2017-11-03 WO PCT/EP2017/078153 patent/WO2019086120A1/fr not_active Ceased
Patent Citations (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20140181831A1 (en) * | 2012-12-20 | 2014-06-26 | Thomson Licensing | DEVICE AND METHOD FOR OPTIMIZATION OF DATA PROCESSING IN A MapReduce FRAMEWORK |
| US20140201564A1 (en) * | 2013-01-15 | 2014-07-17 | Microsoft Corporation | Healing cloud services during upgrades |
Non-Patent Citations (3)
| Title |
|---|
| G ANANTHANARAYANAN; A GHODSI; S SHENKER; I STOICA: "Effective Straggler Mitigation: Attack of the Clones", PROC. 10TH USENIX CONFERENCE ON NETWORKED SYSTEMS DESIGN AND IMPLEMENTATION, 2013, pages 185 - 198 |
| GANESH ANANTHANARAYANAN ET AL: "Effective Straggler Mitigation: Attack of the Clones", PROCEEDINGS OF THE 11TH USENIX SECURITY SYMPOSIUM, AUGUST 5-9, 2002; SAN FRANCISCO, CA, USA, 12 April 2013 (2013-04-12), pages 185 - 198, XP055484981, ISBN: 978-1-931971-00-3, Retrieved from the Internet <URL:https://www.usenix.org/system/files/conference/nsdi13/nsdi13-final231.pdf> [retrieved on 20180615] * |
| J DEAN; L A BARROSO: "The Tail at Scale", CACM, vol. 56, no. 2, 2013, pages 74 - 80, XP058030309, DOI: doi:10.1145/2408776.2408794 |
Cited By (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN111931949A (zh) * | 2019-05-13 | 2020-11-13 | 国际商业机器公司 | 联邦学习环境中的通信 |
| US12073262B2 (en) | 2020-07-14 | 2024-08-27 | Graphcore Limited | Barrier synchronization between host and accelerator over network |
| WO2023115272A1 (fr) * | 2021-12-20 | 2023-06-29 | 华为技术有限公司 | Appareil de gestion de puce et procédé associé |
Also Published As
| Publication number | Publication date |
|---|---|
| EP3701374A1 (fr) | 2020-09-02 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| US10156986B2 (en) | Gang migration of virtual machines using cluster-wide deduplication | |
| Harlap et al. | Addressing the straggler problem for iterative convergent parallel ML | |
| JP6618614B2 (ja) | 分散型ストリームベースのデータベーストリガ | |
| US10237335B2 (en) | Managing cluster-level performance variability without a centralized controller | |
| Wang et al. | Hadoop high availability through metadata replication | |
| Almeida et al. | ChainReaction: a causal+ consistent datastore based on chain replication | |
| US10387179B1 (en) | Environment aware scheduling | |
| EP3087503B1 (fr) | Planification de calcul en nuage à l'aide d'un modèle de contention heuristique | |
| US20180004777A1 (en) | Data distribution across nodes of a distributed database base system | |
| Wang et al. | Load‐balanced and locality‐aware scheduling for data‐intensive workloads at extreme scales | |
| US8689226B2 (en) | Assigning resources to processing stages of a processing subsystem | |
| US10365980B1 (en) | Storage system with selectable cached and cacheless modes of operation for distributed storage virtualization | |
| US8108718B2 (en) | Checkpointing in massively parallel processing | |
| Mei et al. | Fault-tolerant dynamic rescheduling for heterogeneous computing systems | |
| CN108810115B (zh) | 一种适用于分布式数据库的负载均衡方法、装置及服务器 | |
| WO2007084700A2 (fr) | Système et procédé de gestion de fils dans le calcul parallèle multifils de fils imbriqués | |
| CN103067425A (zh) | 虚拟机创建方法、虚拟机管理系统及相关设备 | |
| Li et al. | A convergence of key‐value storage systems from clouds to supercomputers | |
| Fan | Job scheduling in high performance computing | |
| US20020083116A1 (en) | Buffered coscheduling for parallel programming and enhanced fault tolerance | |
| US9195528B1 (en) | Systems and methods for managing failover clusters | |
| CN106354563A (zh) | 用于3d重建的分布式计算系统以及3d重建方法 | |
| WO2019086120A1 (fr) | Système et procédé de calcul parallèle universel à haute performance ayant une tolérance aux pannes et une tolérance aux temps de latence | |
| US8977752B2 (en) | Event-based dynamic resource provisioning | |
| Jin et al. | Distmind: Efficient resource disaggregation for deep learning workloads |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| 121 | Ep: the epo has been informed by wipo that ep was designated in this application |
Ref document number: 17800443 Country of ref document: EP Kind code of ref document: A1 |
|
| NENP | Non-entry into the national phase |
Ref country code: DE |
|
| ENP | Entry into the national phase |
Ref document number: 2017800443 Country of ref document: EP Effective date: 20200525 |