WO2021061183A1 - Shuffle reduce tasks to reduce i/o overhead - Google Patents
Shuffle reduce tasks to reduce i/o overhead Download PDFInfo
- Publication number
- WO2021061183A1 WO2021061183A1 PCT/US2020/013686 US2020013686W WO2021061183A1 WO 2021061183 A1 WO2021061183 A1 WO 2021061183A1 US 2020013686 W US2020013686 W US 2020013686W WO 2021061183 A1 WO2021061183 A1 WO 2021061183A1
- Authority
- WO
- WIPO (PCT)
- Prior art keywords
- data
- shuffle
- memory
- reduce
- reduce operation
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Ceased
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5061—Partitioning or combining of resources
- G06F9/5066—Algorithms for mapping a plurality of inter-dependent sub-tasks onto a plurality of physical CPUs
Definitions
- This application is related to large-scale data processing systems and, in particular, to systems and methods for reducing the reading and writing of data while reducing the number of random access I/O requests to hard disk drives (HDD) to thereby reduce the total execution time during complex data processing operations.
- HDD hard disk drives
- Shuffle Reduce tasks are designed and integrated into data analytics frameworks.
- a driver module in the data analytics framework determines whether or not to utilize the Shuffle Reduce tasks and how many of them to utilize.
- the Shuffle Reduce tasks described herein make sequential read I/O requests instead of a large number of random access I/O requests performed in the traditional approach (reducing I/O overhead).
- the size of data read by the Shuffle Reduce tasks described herein is significantly decreased leading to less I/O overhead and improvement in total execution time.
- a Shuffle Reduce module may be included in the data analytics framework driver that decides whether to invoke Shuffle Reduce tasks (how many, etc.) for a job or to maintain the existing execution plan. If the Shuffle Reduce task is invoked, it is invoked as part of the job execution plan.
- a method of performing a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during a data analytics process.
- the method includes receiving as input at least two input files from a first memory, where each input file has been sorted and written by a different map task; fetching batches of data from each input file; and merging and sorting the batches of data in a second memory to form a unified piece of data.
- a Shuffle Reduce operation is applied to the unified piece of data to produce output data.
- the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data.
- a data analytics system comprising at least one processor; a first memory that stores at least two input files, each input file being sorted and written by a different map task of a data analytics process; a second memory; a third memory; and an instruction memory that stores instructions that upon execution by the at least one processor performs a data analytics process including a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during the data analytics process.
- the Shuffle Reduce operation comprises: (1) receiving as input the at least two input files from the first memory; (2) fetching batches of data from each input file; (3) merging and sorting the batches of data in the second memory to form a unified piece of data; (4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data; (5) writing the output data to the third memory; and (6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
- a non-transitory computer readable storage medium comprising instructions that upon execution by at least one processor cause the processor to perform a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during a data analytics process by performing operations comprising: (1) receiving as input at least two input files from a first memory, where each input file has been sorted and written by a different map task; (2) fetching batches of data from each input file; (3) merging and sorting the batches of data in a second memory to form a unified piece of data; (4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data; (5) writing the output data to a third memory; and (6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
- the first memory may comprise at least one hard disk drive and the third memory may comprise at least one of a solid state disk and a persistent memory.
- fetching the batches of data from each input file comprises fetching a total amount of data that does not exceed a memory capacity allocated for the Shuffle Reduce operation.
- a driver module determines whether to perform the Shuffle Reduce operation for a particular job in the data analytics process based on whether the job includes a Shuffle operation, whether a workload for the job is large enough to reap performance gains, whether the job includes the commutative reduce operation that provides an amount of output data in the output files that is less than an amount of input data, how many tasks to start to implement the Shuffle Reduce operation, and when to start and stop the Shuffle Reduce operation.
- the data analytics process is implemented on a data analytics platform and applying the Shuffle Reduce operation comprises applying as the commutative reduce operation at least one of an aggregate by key operation, a group by key operation, and a reduce by key operation.
- a first task performs the steps of receiving input files, fetching batches of data from each input file, and merging and sorting the batches of data while a second task performs the steps of applying the Shuffle Reduce operation and writing the output.
- the first task and the second task communicate directly with each other independent of the first memory and the third memory.
- applying the Shuffle Reduce operation to the unified piece of data to produce output data comprises applying a plurality of tasks to implement the Shuffle Reduce operation.
- the method may be performed and the instructions on the computer readable media may be processed by the apparatus, and further features of the method and instructions on the computer readable media result from the functionality of the apparatus. Also, the explanations provided for each aspect and its implementation apply equally to the other aspects and the corresponding implementations. The different embodiments may be implemented in hardware, software, or any combination thereof. Also, any one of the foregoing examples may be combined with any one or more of the other foregoing examples to create a new embodiment within the scope of the present disclosure.
- FIG. 1 illustrates a logical view of a SparkTM job that applies transformations (map and filter) on data from two separate tables, joins, and aggregates the items over each key (a certain field of items) using a GroupByKey function.
- FIG. 2 illustrates the SparkTM execution plan of the job illustrated in
- FIG. 3 illustrates the mapping between map and reduce tasks during a shuffle phase.
- FIG.4 illustrates a RiffleTM data analytics system that requires less write and read I/O overhead due to decreased number of tasks but that needs to perform reads and writes to the whole shuffle data twice.
- FIG.5 illustrates a sample embodiment of the Shuffle Reduce approach for reducing I/O overhead.
- FIG. 6 illustrates the steps performed by the Shuffle Reduce operation in a sample embodiment.
- FIG. 7 illustrates a flow chart of the Shuffle Reduce operation in a sample embodiment.
- FIG. 8 illustrates an example of a Shuffle Reduce operation for the indicated input files in a sample embodiment.
- FIG. 9 is a block diagram illustrating circuitry for performing the methods according to sample embodiments. DETAILED DESCRIPTION
- the functions or algorithms described herein may be implemented in software in one embodiment.
- the software may include computer executable instructions stored on computer readable media or computer readable storage device such as one or more non-transitory memories or other type of hardware- based storage devices, either local or networked.
- modules which may be software, hardware, firmware or any combination thereof. Multiple functions may be performed in one or more modules as desired, and the embodiments described are merely examples.
- the software may be executed on a digital signal processor, ASIC, microprocessor, or other type of processor operating on a computer system, such as a personal computer, server or other computer system, turning such computer system into a specifically programmed machine.
- FIG. 1 illustrates a logical view of a SparkTM job that applies a map transformation 110 and a filter transformation 120 to input data 130 and a map transformation 140 to input data 150 where the input data 130 and 150 are from two separate tables.
- the items over each key (a certain field of items) are joined and aggregated using, for example, a join function 160.
- the output data 180 is stored in a result table.
- the SparkTM execution plan of the job illustrated in FIG. 1 is shown in FIG. 2.
- a DAG execution platform e.g. Spark
- the SparkTM system pipelines the transformations on each partition 230 and 240 and performs the operators in a single state. Internally, the SparkTM system tries to keep the intermediate data of a single task in memory during a map stage so that the pipeline operators (e.g. the filter operator 220 following the map operator 210 in the first stage) can be performed efficiently.
- shuffle phase includes an all-to-all communication for the wide dependency between stage 1 (map stage) and stage 2 (reduce stage).
- Each map task 210 reads from a data partition (e.g., several rows of a large table), transforms the data into an intermediate format with the map task operators, sorts or aggregates the items by the partitioning function of the reduce stage (e.g., key ranges) to produce blocks of items, and saves the blocks to on-disk intermediate files.
- the map task 210 also writes a separate index file that shows the offsets of blocks corresponding to each reduce task of the reduce stage.
- Each reduce task brings together the designated data blocks and performs reduce task operations. By looking up the offsets in index files, each reduce task issues fetch requests to the target blocks from all the map output files. Thus, data that was originally partitioned according to table rows are processed and shuffled to data partitioned according to reduce key ranges.
- each reduce task requires reading data blocks from all the map task outputs. If the intermediate shuffle files were not persisted, even a single reduce task failure could lead to recomputing the entire map stage. It is thus important to persist shuffle data for strong fault tolerance.
- the Shuffle operation of the shuffle phase is an extremely resource intensive operation.
- Each block of data transferred from a map task to a reduce task needs to go through data serialization, disk and network I/O, and data deserialization.
- the Shuffle operation is heavily used in various types of jobs. For example, jobs requiring data to be partitioned, grouped or reduced by key, or joined all involve Shuffle operations.
- the shuffle phase may be used for operations such as determining how many times a particular word appears in text where the individual results for each map task is combined for a final result.
- each map task 310 produces respective data sets 330, 340, and 350 that are directed to the respective reduce tasks 360, 370, and 380 as illustrated.
- the data sets 330 are processed by reduce task 360
- data sets 340 are processed by reduce task 370
- data sets 350 are processed by reduce task 380.
- shuffling operations are the most difficult ones for distributed data analytics platforms to perform.
- Shuffle operations for large workloads write data to persistent storage whereby, in the case of a reduce task failure, there is no need to rerun the whole map stage.
- large amounts of data are ephemerally persisted in hard disk drives (HDDs) as the most cost-effective solution. Random access FO requests cause a significant overhead over HDDs.
- the aforementioned RiffleTM system provides a shuffle service that takes a different approach to the scaling bottleneck caused by the all-to-all data transfer of Shuffle operations.
- the RiffleTM system introduces a merge task in the middle of the shuffle phase.
- the RiffleTM system assigns merge tasks to read and merges fragmented intermediate shuffle files produced by the map tasks into larger block files, thereby converting small, random disk I/O requests into large, sequential requests.
- the reduce tasks perform much less I/O random access requests to the HDD.
- the RiffleTM system performs M-R/N random access I/O requests, where N is the average number of map tasks correlated with each merge task.
- the performance improvement offered by the RiffleTM system comes with a considerable cost. As illustrated in FIG. 4, while the RiffleTM system requires less write and read I/O overhead due to a decreased number of tasks (by a factor of N), the RiffleTM system performs reads and writes to the entirety of the shuffle data twice - once for reading the data sets 330, 340, and 350 created by the map tasks 310, and once for reading the merged data sets 420, 430, and 440 created by the merge tasks 410. Doubling of the reads and writes to the entirety of the shuffle data still causes a large I/O overhead, particularly for the larger block files created during the merge operation 410.
- the systems and methods described herein further reduce the I/O overhead in a cost-effective manner by implementing a shuffle phase for data analytics systems that mostly avoids reading and writing the whole shuffle data twice (like the RiffleTM system) while also reducing the number of random access I/O requests to HDD (like the RiffleTM system).
- the systems and methods provided herein reduce the total execution time of the shuffle phase compared to conventional implementations such as in the RiffleTM system and reduce the number of I/O requests to HDD by taking advantage of two characteristics that user workloads frequently exhibit, namely:
- a Shuffle Reduce operation that combines the merge file operation with the Reduce operation.
- the Shuffle Reduce operation is a commutative Reduce operation implemented in the Shuffle phase that provides an amount of output data that is less than an amount of input data. Since the Reduce operations are commutative, the final result is correct as the final result is the same as in previous approaches. Moreover, the Shuffle Reduce operation ends up making the same number of I/O requests to HDD as in the RiffleTM system. However, the second batch of reads and writes is performed on significantly less data as only the data resulting from the aggregate operation is stored.
- FIG.5 illustrates a sample embodiment of the Shuffle Reduce approach for reducing I/O overhead.
- the merge operation 410 of FIG. 4 is replaced by the Shuffle Reduce task 510 that merges the respective data sets 330, 340, and 350 from the respective map tasks 310 and reduces the data sets by implementing the reduce operations to produce reduced size data sets 520, 530, and 540 that correspond to data sets 330, 340, and 350, respectively.
- the data sets 330, 340, and 350 as well as reduced size data sets 520, 530, and 540 are stored in HDD for fault-tolerance.
- the reduce tasks 360, 370, and 380 are then applied to the reduced size data sets 520, 530, and 540, respectively. Due to the reduced size of the data sets 520, 530, and 540, the total execution time of the resulting shuffle phase is significantly reduced.
- FIG. 6 illustrates the steps performed by the Shuffle Reduce operation while FIG. 7 illustrates a flow chart of the Shuffle Reduce operation in a sample embodiment.
- the Shuffle Reduce task is implemented by receiving multiple input files 610 at step (1) (operation 710 in FIG. 7) from external HDDs.
- each input file 610 is written by a different map task in different partitions.
- Small batches of data 620 are then fetched from all input files 610 at step (2) as indicated by arrows 630 in FIG. 6 (operation 720 in FIG. 7).
- the total size of the fetched data should not exceed the Dynamic Random Access Memory (DRAM) capacity of the Shuffle Reduce task’s process.
- the small batches of data are then merged and sorted together (according to their key) at step (3) to form unified data 640 (operation 730 in FIG. 7).
- DRAM Dynamic Random Access Memory
- a Shuffle Reduce operation is applied to the unified data 640 at step (4) to produce reduced data set 650 (operation 740 in FIG. 7).
- the Shuffle Reduce operation is applied when the reduce operation is commutative and decreases the amount of data such that the output data size is much less compared to the input data size.
- reduce operations include the aggregate by key operation, the reduce by key operation, and the group by key operation.
- the AggregateByKey operation in the SparkTM system groups values with respect to a common key using given combine functions and a neutral “zero value” to return a different type of value for the key.
- the AggregateByKey operation allows one to process a tuple of Student, Subject and Scores for that subject into an output data set including the student and maximum scores or percentage.
- the ReduceByKey operation in the SparkTM system merges the values for each key using an associative reduce function that accepts two arguments and returns a single element and is commutative and associative in mathematical nature.
- the ReduceByKey function produces the same result when repetitively applied on the same set of data with multiple partitions irrespective of the element’s order.
- the GgroupByKey function in the SparkTM system is quite similar to the ReduceByKey function.
- the GroupByKey function takes a key-value pair (K, V) as an input and collects the values for each key in a form of an iterator to produce an output including a key and a list of values.
- K, V key-value pair
- the GroupByKey function groups all values with respect to a single key and returns in the form of an iterator.
- Multiple pipeline operations that reduce the amount of data may also be used for the Shuffle Reduce operation. Similar functions for other data analytics systems will be apparent to those skilled in the art.
- the reduced data set 650 is then written to external HDD at step (5) as indicated by arrow 660 (operation 750 in FIG. 7).
- steps (2)-(5) are repeated at step (6) until all input data has been processed (step 760 in FIG. 7).
- the process ends at operation 770 when there is no more input data to process.
- the Shuffle Reduce operation thus reduces the data size read by the reduce tasks (compare data size after step (6) compared to initial size (1) in FIG. 6), which significantly reduces I/O overhead and decreases the total execution time of the shuffle phase. Due to the reduced data size, HDDs may be utilized for a cost- effective solution for fault-tolerance.
- sample embodiments may include a Shuffle Reduce module that is included in the data analytics framework driver.
- the data analytics framework driver is the master node in the data analytics application that splits a data analytics application into tasks and schedules the tasks to run on executors.
- the driver module may spawn tasks across multiple partitions.
- the driver module is provided on the client side to decide whether to invoke the Shuffle Reduce tasks described herein (how many, etc.) for a job or to maintain the existing execution plan.
- the driver module of a conventional data analytics framework driver is revised to determine when to invoke the Shuffle Reduce tasks by checking:
- a first alternative solution to the embodiment described above would divide the Shuffle Reduce phase into two different tasks (merge, preduce). This embodiment would have similar characteristics to the embodiment described above. All the operations before the merge operation would be carried out by the merge task and the rest would be carried out by the preduce task. The merge task will communicate with the preduce task directly and not via HDD.
- the benefit of the first alternative solution is that a data analytics framework user/operator may scale up/down the merge and reduce operations in the shuffle phase independently, which provides more flexibility and better utilization of resources.
- the disadvantage is that this embodiment adds an extra layer of network communications since it gets rid of the locality aspect of the first embodiment.
- FIG. 8 illustrates an example of a Shuffle Reduce operation for the indicated input files in a sample embodiment.
- a preduce task 850 performs a ReduceByKey (sum) operation on the respective files 830 and 840 to produce the respective output files 860 and 870.
- the ReduceByKey is a commutative operation and that the resulting files 860 and 870 are smaller than the files 830 and 840.
- the output key value pair includes the fruit along with the number of occurrences in the merged input files 830 and 840.
- the Shuffle Reduce phase decreases the HDD I/O overhead since it significantly reduces the number of random access I/O requests.
- the Shuffle Reduce task described herein also greatly reduces the data size that needs to be read by the reduce tasks. As a result, the I/O overhead along with the total execution time of shuffle phase are considerably decreased.
- the following table compares the performance of traditional data analytics frameworks including the SparkTM system, the RiffleTM system, and the Shuffle Reduce approach implemented herein.
- M is the number of map tasks
- R is the number of reduce tasks
- N is the number of map tasks per merge or Shuffle Reduce task
- FIG. 9 illustrates a general-purpose computer 900 suitable for implementing one or more embodiments of the methods disclosed herein.
- the computer 900 in FIG. 9 may be implemented on a data analytics framework (e.g., SparkTM, HadoopTM, TensorFlowTM, and DryadTM) of the type described herein.
- a data analytics framework e.g., SparkTM, HadoopTM, TensorFlowTM, and DryadTM
- the components described above may be implemented on any general-purpose network component, such as a computer 900 with sufficient processing power, memory resources, and network throughput capability to handle the necessary workload placed upon it.
- the computer 900 includes a processor 910 (which may be referred to as a central processor unit or CPU) that is in communication with memory devices including secondary storage 920, read only memory (ROM) 930, random access memory (RAM) 940, input/output (I/O) devices 950, and network connectivity devices 960.
- the network connectivity devices 960 further connect the processor 910 to a client side data analytics driver 970 that manages the data analytics process and determines when to invoke the Shuffle Reduce tasks as described herein.
- the processor 910 may be implemented as one or more CPU chips, or may be part of one or more application specific integrated circuits (ASICs).
- ASICs application specific integrated circuits
- the secondary storage 920 is typically comprised of one or more disk drives or tape drives and is used for non-volatile storage of data and as an over flow data storage device if RAM 940 is not large enough to hold all working data. Secondary storage 920 may be used to store programs that are loaded into RAM 940 when such programs are selected for execution.
- the ROM 930 is used to store instructions and perhaps data that are read during program execution. ROM 930 is a non-volatile memory device that typically has a small memory capacity relative to the larger memory capacity of secondary storage 920.
- the RAM 940 is used to store volatile data and perhaps to store instructions. Access to both ROM 930 and RAM 940 is typically faster than to secondary storage 920.
- computer 900 may execute instructions from computer-readable non-transitory media storing computer readable instructions and one or more processors coupled to the memory, and when executing the computer readable instructions, the computer 900 is configured to perform method steps and operations described in the disclosure with reference to FIG. 1 to FIG. 8.
- the computer-readable non-transitory media includes all types of computer readable media, including magnetic storage media, optical storage media, flash media and solid state storage media.
- software including one or more computer-executable instructions that facilitate processing and operations as described above with reference to any one or all of steps of the disclosure may be installed in and sold with one or more servers or databases.
- the software may be obtained and loaded into one or more servers or one or more databases in a manner consistent with the disclosure, including obtaining the software through physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator.
- the software may be stored on a server for distribution over the Internet, for example.
- the components of the illustrative devices, systems and methods employed in accordance with the illustrated embodiments may be implemented, at least in part, in digital electronic circuitry, analog electronic circuitry, or in computer hardware, firmware, software, or in combinations of them. These components also mayn be implemented, for example, as a computer program product such as a computer program, program code or computer instructions tangibly embodied in an information carrier, or in a machine -readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
- a computer program may be written in any form of programming language, including compiled or interpreted languages, and it may be deployed in any form, including as a stand-alone program or as a module, component, subroutine, or other unit suitable for use in a computing environment.
- a computer program may be deployed to be executed on one computer or on multiple computers at one site or distributed across multiple sites and interconnected by a communication network.
- functional programs, codes, and code segments for accomplishing the systems and methods described herein may be easily construed as within the scope of the disclosure by programmers skilled in the art to which the present disclosure pertains.
- Method steps associated with the illustrative embodiments may be performed by one or more programmable processors executing a computer program, code or instructions to perform functions (e.g., by operating on input data and generating an output). Method steps may also be performed by, and apparatus may be implemented as, special purpose logic circuitry, e.g., an FPGA (field programmable gate array) or an ASIC (application-specific integrated circuit), for example.
- special purpose logic circuitry e.g., an FPGA (field programmable gate array) or an ASIC (application-specific integrated circuit
- DSP digital signal processor
- a general purpose processor may be a microprocessor, but in the alternative, the processor may be any conventional processor, controller, microcontroller, or state machine.
- a processor may also be implemented as a combination of computing devices, e.g., a combination of a DSP and a microprocessor, a plurality of microprocessors, one or more microprocessors in conjunction with a DSP core, or any other such configuration.
- processors suitable for the execution of a computer program include, by way of example, both general and special purpose microprocessors, and any one or more processors of any kind of digital computer.
- a processor will receive instructions and data from a read-only memory or a random access memory or both.
- the essential elements of a computer are a processor for executing instructions and one or more memory devices for storing instructions and data.
- a computer will also include, or be operatively coupled to receive data from or transfer data to, or both, one or more mass storage devices for storing data, e.g., magnetic, magneto-optical disks, or optical disks.
- Information carriers suitable for embodying computer program instructions and data include all forms of non-volatile memory, including by way of example, semiconductor memory devices, e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks (e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, CD-ROM disks, or DVD-ROM disks).
- semiconductor memory devices e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks (e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, CD-ROM disks, or DVD-ROM disks).
- EPROM electrically programmable read-only memory
- EEPROM electrically erasable programmable ROM
- flash memory devices e.g., electrically
- a software module may reside in random access memory (RAM), flash memory, ROM, EPROM, EEPROM, registers, hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art.
- RAM random access memory
- ROM read-only memory
- EPROM erasable programmable read-only memory
- EEPROM electrically erasable programmable read-only memory
- registers hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art.
- the storage medium may be integral to the processor.
- the processor and the storage medium may reside in an integrated circuit or be implemented as discrete components.
- machine-readable medium means a device able to store instructions and data temporarily or permanently and may include, but is not limited to, random-access memory (RAM), read-only memory (ROM), buffer memory, flash memory, optical media, magnetic media, cache memory, other types of storage (e.g., Erasable Programmable Read-Only Memory (EEPROM)), and any suitable combination thereof.
- RAM random-access memory
- ROM read-only memory
- buffer memory flash memory
- optical media magnetic media
- cache memory other types of storage
- EEPROM Erasable Programmable Read-Only Memory
- machine-readable medium shall also be taken to include any medium, or combination of multiple media, that is capable of storing instructions for execution by one or more processors, such that the instructions, when executed by one or more processors cause the one or more processors to perform any one or more of the methodologies described herein. Accordingly, a “machine-readable medium” refers to a single storage apparatus or device, as well as “cloud-based” storage systems or storage networks that include multiple storage apparatus or devices. The term “machine-readable medium” as used herein excludes signals per se.
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
A Shuffle Reduce operation receives as input files that have been sorted and written by different map tasks, fetches batches of data from each input file, and merges and sorts the batches of data to form a large unified piece of data. A Shuffle Reduce operation is applied to the unified piece of data to produce output data. The Shuffle Reduce operation includes a commutative reduce operation that provides an amount of output data that is significantly less than an amount of input data. The output data is written to memory. The process is repeated for different batches of data until data from each input file is entirely consumed and the output data has been fully formed. The Shuffle Reduce operation greatly reduces the data size that needs to be read by the reduce tasks in a Shuffle operation, thereby significantly reducing the input/output overhead and total execution time.
Description
SHUFFLE REDUCE TASKS TO REDUCE I/O OVERHEAD
TECHNICAL FIELD
[0001] This application is related to large-scale data processing systems and, in particular, to systems and methods for reducing the reading and writing of data while reducing the number of random access I/O requests to hard disk drives (HDD) to thereby reduce the total execution time during complex data processing operations. BACKGROUND
[0002] Large-scale data analytics systems such as Spark™, Hadoop™, TensorFlow™, and Dryad™ are commonly used to perform complex data analytics operations. Such systems keep data partitions in memory for pipelined operators and persist data across stages with wide dependencies on hard disk drives for fault tolerance. In such systems, all-to-all data transfers - so called shuffle operations - become the scaling bottleneck when running the many small tasks into which jobs are divided for multi-stage data analytics. It has been observed by Zhang et ak, in “Riffle: Optimized Shuffle Service for Large-Scale Data Analytics,” Proceedings of the Thirteenth EuroSys Conference, ACM, p. 43, April 23-26, 2018, that this bottleneck is due to the superlinear increase in disk I/O operations as the data volume increases. To address the increase in disk I/O operations, Zhang et al. propose to efficiently merge fragmented intermediate shuffle files into large block files to thus convert the small random disk I/O requests into large sequential requests. [0003] Unfortunately, such solutions come with considerable cost. For example, the Riffle™ system needs to perform reads and writes to all of the data twice, which causes a large I/O overhead. It remains desirable to establish additional methods for reducing the reading and writing of data while reducing the number of random access I/O requests to hard disk drives (HDD) to further reduce the total execution time during complex data processing operations such as data analytics.
SUMMARY
[0004] Various examples are now described to introduce a selection of concepts in a simplified form that are further described below in the Detailed Description. The Summary is not intended to identify key or essential features of the claimed subject matter, nor is it intended to be used to limit the scope of the claimed subject matter.
[0005] In sample embodiments, Shuffle Reduce tasks are designed and integrated into data analytics frameworks. A driver module in the data analytics framework determines whether or not to utilize the Shuffle Reduce tasks and how many of them to utilize. The Shuffle Reduce tasks described herein make sequential read I/O requests instead of a large number of random access I/O requests performed in the traditional approach (reducing I/O overhead). Compared to the Riffle™ system, the size of data read by the Shuffle Reduce tasks described herein is significantly decreased leading to less I/O overhead and improvement in total execution time.
[0006] In sample embodiments, a Shuffle Reduce module may be included in the data analytics framework driver that decides whether to invoke Shuffle Reduce tasks (how many, etc.) for a job or to maintain the existing execution plan. If the Shuffle Reduce task is invoked, it is invoked as part of the job execution plan.
This solution can be applied to any dataflow execution platform.
[0007] According to a first aspect of the present disclosure, there is provided a method of performing a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during a data analytics process. The method includes receiving as input at least two input files from a first memory, where each input file has been sorted and written by a different map task; fetching batches of data from each input file; and merging and sorting the batches of data in a second memory to form a unified piece of data. A Shuffle Reduce operation is applied to the unified piece of data to produce output data. In sample embodiments, the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data. The output data is written to a third memory and the steps are repeated until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
[0008] According to a second aspect of the present disclosure, there is provided a data analytics system comprising at least one processor; a first memory that stores at least two input files, each input file being sorted and written by a different map task of a data analytics process; a second memory; a third memory; and an instruction memory that stores instructions that upon execution by the at least one processor performs a data analytics process including a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during the data analytics process. In sample embodiments, the Shuffle Reduce operation comprises: (1) receiving as input the at least two input files from the first memory; (2) fetching batches of data from each input file; (3) merging and sorting the batches of data in the second memory to form a unified piece of data; (4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data; (5) writing the output data to the third memory; and (6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
[0009] According to a third aspect of the present disclosure, there is provided a non-transitory computer readable storage medium comprising instructions that upon execution by at least one processor cause the processor to perform a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during a data analytics process by performing operations comprising: (1) receiving as input at least two input files from a first memory, where each input file has been sorted and written by a different map task; (2) fetching batches of data from each input file; (3) merging and sorting the batches of data in a second memory to form a unified piece of data; (4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data; (5) writing the output data to a third memory; and (6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
[0010] In a first implementation of any of the preceding aspects, the first memory and the third memory comprise hard disk drives and the second memory comprises dynamic random access memory (DRAM).
[0011] In a second implementation of any of the preceding aspects, the first memory may comprise at least one hard disk drive and the third memory may comprise at least one of a solid state disk and a persistent memory.
[0012] In a third implementation of any of the preceding aspects, fetching the batches of data from each input file comprises fetching a total amount of data that does not exceed a memory capacity allocated for the Shuffle Reduce operation. [0013] In a fourth implementation of any of the preceding aspects, a driver module determines whether to perform the Shuffle Reduce operation for a particular job in the data analytics process based on whether the job includes a Shuffle operation, whether a workload for the job is large enough to reap performance gains, whether the job includes the commutative reduce operation that provides an amount of output data in the output files that is less than an amount of input data, how many tasks to start to implement the Shuffle Reduce operation, and when to start and stop the Shuffle Reduce operation.
[0014] In a fifth implementation of any of the preceding aspects, the data analytics process is implemented on a data analytics platform and applying the Shuffle Reduce operation comprises applying as the commutative reduce operation at least one of an aggregate by key operation, a group by key operation, and a reduce by key operation.
[0015] In a sixth implementation of any of the preceding aspects, a first task performs the steps of receiving input files, fetching batches of data from each input file, and merging and sorting the batches of data while a second task performs the steps of applying the Shuffle Reduce operation and writing the output. In sample embodiments, the first task and the second task communicate directly with each other independent of the first memory and the third memory. [0016] In a seventh implementation of any of the preceding aspects, applying the Shuffle Reduce operation to the unified piece of data to produce output data comprises applying a plurality of tasks to implement the Shuffle Reduce operation.
[0017] The method may be performed and the instructions on the computer readable media may be processed by the apparatus, and further features of the
method and instructions on the computer readable media result from the functionality of the apparatus. Also, the explanations provided for each aspect and its implementation apply equally to the other aspects and the corresponding implementations. The different embodiments may be implemented in hardware, software, or any combination thereof. Also, any one of the foregoing examples may be combined with any one or more of the other foregoing examples to create a new embodiment within the scope of the present disclosure.
BRIEF DESCRIPTION OF THE DRAWINGS [0018] In the drawings, which are not necessarily drawn to scale, like numerals may describe similar components in different views. The drawings illustrate generally, by way of example, but not by way of limitation, various embodiments discussed in the present document.
[0019] FIG. 1 illustrates a logical view of a Spark™ job that applies transformations (map and filter) on data from two separate tables, joins, and aggregates the items over each key (a certain field of items) using a GroupByKey function.
[0020] FIG. 2 illustrates the Spark™ execution plan of the job illustrated in
FIG. 1.
[0021] FIG. 3 illustrates the mapping between map and reduce tasks during a shuffle phase.
[0022] FIG.4 illustrates a Riffle™ data analytics system that requires less write and read I/O overhead due to decreased number of tasks but that needs to perform reads and writes to the whole shuffle data twice.
[0023] FIG.5 illustrates a sample embodiment of the Shuffle Reduce approach for reducing I/O overhead.
[0024] FIG. 6 illustrates the steps performed by the Shuffle Reduce operation in a sample embodiment.
[0025] FIG. 7 illustrates a flow chart of the Shuffle Reduce operation in a sample embodiment.
[0026] FIG. 8 illustrates an example of a Shuffle Reduce operation for the indicated input files in a sample embodiment.
[0027] FIG. 9 is a block diagram illustrating circuitry for performing the methods according to sample embodiments.
DETAILED DESCRIPTION
[0028] It should be understood at the outset that although an illustrative implementation of one or more embodiments are provided below, the disclosed systems and/or methods described with respect to FIGS. 1-9 may be implemented using any number of techniques, whether currently known or in existence. The disclosure should in no way be limited to the illustrative implementations, drawings, and techniques illustrated below, including the exemplary designs and implementations illustrated and described herein, but may be modified within the scope of the appended claims along with their full scope of equivalents.
[0029] The functions or algorithms described herein may be implemented in software in one embodiment. The software may include computer executable instructions stored on computer readable media or computer readable storage device such as one or more non-transitory memories or other type of hardware- based storage devices, either local or networked. Further, such functions correspond to modules, which may be software, hardware, firmware or any combination thereof. Multiple functions may be performed in one or more modules as desired, and the embodiments described are merely examples. The software may be executed on a digital signal processor, ASIC, microprocessor, or other type of processor operating on a computer system, such as a personal computer, server or other computer system, turning such computer system into a specifically programmed machine.
[0030] Large data analytics platforms like Spark™, Hadoop™, TensorFlow™, and Dryad™ commonly use Directed Acyclic Graph (DAG) models to define user jobs. For example, FIG. 1 illustrates a logical view of a Spark™ job that applies a map transformation 110 and a filter transformation 120 to input data 130 and a map transformation 140 to input data 150 where the input data 130 and 150 are from two separate tables. The items over each key (a certain field of items) are joined and aggregated using, for example, a join function 160. After filtering by filter task 170, the output data 180 is stored in a result table.
[0031] The Spark™ execution plan of the job illustrated in FIG. 1 is shown in FIG. 2. As illustrated, a DAG execution platform (e.g. Spark) decides how to map these operations to physical resources. Since the operations may occur across partitions, communications across partitions is needed for processing. For narrow
dependencies (map and filter operations 210 and 220), the Spark™ system pipelines the transformations on each partition 230 and 240 and performs the operators in a single state. Internally, the Spark™ system tries to keep the intermediate data of a single task in memory during a map stage so that the pipeline operators (e.g. the filter operator 220 following the map operator 210 in the first stage) can be performed efficiently. However, some operations like map 210, filter 220, and many more cause narrow dependencies between tasks (one-to-one communication patterns). Typically, these operations are bundled together and executed by the same process. On the other hand, operations like join 250, ReduceByKey, GroupByKey, and many more, cause wide dependencies (many-to-many communication patterns) between different tasks during the reduce stage prior to filtering by filter task 260. Traditionally, the exchange of data between the map stage and the reduce stage is denoted as a shuffle phase as illustrated in FIG. 2. [0032] As used herein, the shuffle phase includes an all-to-all communication for the wide dependency between stage 1 (map stage) and stage 2 (reduce stage). Each map task 210 reads from a data partition (e.g., several rows of a large table), transforms the data into an intermediate format with the map task operators, sorts or aggregates the items by the partitioning function of the reduce stage (e.g., key ranges) to produce blocks of items, and saves the blocks to on-disk intermediate files. The map task 210 also writes a separate index file that shows the offsets of blocks corresponding to each reduce task of the reduce stage. Each reduce task brings together the designated data blocks and performs reduce task operations. By looking up the offsets in index files, each reduce task issues fetch requests to the target blocks from all the map output files. Thus, data that was originally partitioned according to table rows are processed and shuffled to data partitioned according to reduce key ranges. Between stages with wide dependencies, each reduce task requires reading data blocks from all the map task outputs. If the intermediate shuffle files were not persisted, even a single reduce task failure could lead to recomputing the entire map stage. It is thus important to persist shuffle data for strong fault tolerance.
[0033] As a result, the Shuffle operation of the shuffle phase is an extremely resource intensive operation. Each block of data transferred from a map task to a reduce task needs to go through data serialization, disk and network I/O, and data deserialization. Nevertheless, the Shuffle operation is heavily used in various types
of jobs. For example, jobs requiring data to be partitioned, grouped or reduced by key, or joined all involve Shuffle operations. The shuffle phase may be used for operations such as determining how many times a particular word appears in text where the individual results for each map task is combined for a final result.
[0034] Previous solutions let the map tasks write their output data to persistent storage for fault-tolerance (if one reduce task fails there is no need to rerun any map tasks). Hard Disk Drives (HDDs) are preferred over other types of persistent memory for large workloads since they are cheap and easy to scale up. However, this initial design does not scale well as the number of map and reduce tasks is increased. Reduce tasks need to perform M R random access Input/Output (I/O) requests to HDD, where M is the number of map tasks and R is the number of reduce tasks. As a result, the shuffle phase causes large I/O overheads.
[0035] This approach is summarized in Error! Reference source not found., which illustrates the mapping between map tasks 310 and respective reduce tasks 360, 370, and 380 during the shuffle phase. As illustrated in FIG. 3, each map task 310 produces respective data sets 330, 340, and 350 that are directed to the respective reduce tasks 360, 370, and 380 as illustrated. As shown, the data sets 330 are processed by reduce task 360, data sets 340 are processed by reduce task 370, and data sets 350 are processed by reduce task 380.
[0036] As the size of the job data increases, the number of map tasks and reduce tasks grows proportionately. Because each reduce task R needs to fetch from all map tasks M, the number of shuffle I/O requests M-R increases quadratically, and the average block size S/(M-R) for each fetch decreases quadratically. Using fewer reduce tasks R reduces the total number of shuffle fetches and thus improves the shuffle performance. However, using fewer tasks inevitably enlarges the average size of input data and creates very bulky, slow tasks that have to spill intermediate data to permanent storage, thus increasing overhead.
[0037] Thus, shuffling operations (e.g., join) are the most difficult ones for distributed data analytics platforms to perform. Shuffle operations for large workloads write data to persistent storage whereby, in the case of a reduce task failure, there is no need to rerun the whole map stage. However, large amounts of data are ephemerally persisted in hard disk drives (HDDs) as the most cost-effective solution. Random access FO requests cause a significant overhead over HDDs.
[0038] The aforementioned Riffle™ system provides a shuffle service that takes a different approach to the scaling bottleneck caused by the all-to-all data transfer of Shuffle operations. The Riffle™ system introduces a merge task in the middle of the shuffle phase. Instead of providing the data written by the map tasks directly to the reduce tasks, the Riffle™ system assigns merge tasks to read and merges fragmented intermediate shuffle files produced by the map tasks into larger block files, thereby converting small, random disk I/O requests into large, sequential requests. As a result, the reduce tasks perform much less I/O random access requests to the HDD. In this case, the Riffle™ system performs M-R/N random access I/O requests, where N is the average number of map tasks correlated with each merge task.
[0039] However, the performance improvement offered by the Riffle™ system comes with a considerable cost. As illustrated in FIG. 4, while the Riffle™ system requires less write and read I/O overhead due to a decreased number of tasks (by a factor of N), the Riffle™ system performs reads and writes to the entirety of the shuffle data twice - once for reading the data sets 330, 340, and 350 created by the map tasks 310, and once for reading the merged data sets 420, 430, and 440 created by the merge tasks 410. Doubling of the reads and writes to the entirety of the shuffle data still causes a large I/O overhead, particularly for the larger block files created during the merge operation 410.
[0040] The systems and methods described herein further reduce the I/O overhead in a cost-effective manner by implementing a shuffle phase for data analytics systems that mostly avoids reading and writing the whole shuffle data twice (like the Riffle™ system) while also reducing the number of random access I/O requests to HDD (like the Riffle™ system). The systems and methods provided herein reduce the total execution time of the shuffle phase compared to conventional implementations such as in the Riffle™ system and reduce the number of I/O requests to HDD by taking advantage of two characteristics that user workloads frequently exhibit, namely:
1. Reduce operations are commutative.
2. Reduce operations decrease the amount of data (output data size is much less compared to input data size).
Based on these two characteristics, a Shuffle Reduce operation is provided that combines the merge file operation with the Reduce operation. In sample
embodiments, the Shuffle Reduce operation is a commutative Reduce operation implemented in the Shuffle phase that provides an amount of output data that is less than an amount of input data. Since the Reduce operations are commutative, the final result is correct as the final result is the same as in previous approaches. Moreover, the Shuffle Reduce operation ends up making the same number of I/O requests to HDD as in the Riffle™ system. However, the second batch of reads and writes is performed on significantly less data as only the data resulting from the aggregate operation is stored.
[0041] FIG.5 illustrates a sample embodiment of the Shuffle Reduce approach for reducing I/O overhead. As illustrated, the merge operation 410 of FIG. 4 is replaced by the Shuffle Reduce task 510 that merges the respective data sets 330, 340, and 350 from the respective map tasks 310 and reduces the data sets by implementing the reduce operations to produce reduced size data sets 520, 530, and 540 that correspond to data sets 330, 340, and 350, respectively. As illustrated, the data sets 330, 340, and 350 as well as reduced size data sets 520, 530, and 540 are stored in HDD for fault-tolerance. The reduce tasks 360, 370, and 380 are then applied to the reduced size data sets 520, 530, and 540, respectively. Due to the reduced size of the data sets 520, 530, and 540, the total execution time of the resulting shuffle phase is significantly reduced.
[0042] FIG. 6 illustrates the steps performed by the Shuffle Reduce operation while FIG. 7 illustrates a flow chart of the Shuffle Reduce operation in a sample embodiment. As illustrated in FIG. 6 and FIG. 7, the Shuffle Reduce task is implemented by receiving multiple input files 610 at step (1) (operation 710 in FIG. 7) from external HDDs. In sample embodiments, each input file 610 is written by a different map task in different partitions. Small batches of data 620 are then fetched from all input files 610 at step (2) as indicated by arrows 630 in FIG. 6 (operation 720 in FIG. 7). In sample embodiments, the total size of the fetched data should not exceed the Dynamic Random Access Memory (DRAM) capacity of the Shuffle Reduce task’s process. The small batches of data are then merged and sorted together (according to their key) at step (3) to form unified data 640 (operation 730 in FIG. 7).
[0043] A Shuffle Reduce operation is applied to the unified data 640 at step (4) to produce reduced data set 650 (operation 740 in FIG. 7). In sample embodiments, the Shuffle Reduce operation is applied when the reduce operation
is commutative and decreases the amount of data such that the output data size is much less compared to the input data size. In the Spark™ system, for example, such reduce operations include the aggregate by key operation, the reduce by key operation, and the group by key operation. The AggregateByKey operation in the Spark™ system groups values with respect to a common key using given combine functions and a neutral “zero value” to return a different type of value for the key. For example, the AggregateByKey operation allows one to process a tuple of Student, Subject and Scores for that subject into an output data set including the student and maximum scores or percentage. The ReduceByKey operation in the Spark™ system merges the values for each key using an associative reduce function that accepts two arguments and returns a single element and is commutative and associative in mathematical nature. In other words, the ReduceByKey function produces the same result when repetitively applied on the same set of data with multiple partitions irrespective of the element’s order. The GgroupByKey function in the Spark™ system is quite similar to the ReduceByKey function. The GroupByKey function takes a key-value pair (K, V) as an input and collects the values for each key in a form of an iterator to produce an output including a key and a list of values. Thus, the GroupByKey function groups all values with respect to a single key and returns in the form of an iterator. Multiple pipeline operations that reduce the amount of data may also be used for the Shuffle Reduce operation. Similar functions for other data analytics systems will be apparent to those skilled in the art.
[0044] The reduced data set 650 is then written to external HDD at step (5) as indicated by arrow 660 (operation 750 in FIG. 7). When there is more data to process from the input files 610, steps (2)-(5) are repeated at step (6) until all input data has been processed (step 760 in FIG. 7). The process ends at operation 770 when there is no more input data to process.
[0045] The Shuffle Reduce operation thus reduces the data size read by the reduce tasks (compare data size after step (6) compared to initial size (1) in FIG. 6), which significantly reduces I/O overhead and decreases the total execution time of the shuffle phase. Due to the reduced data size, HDDs may be utilized for a cost- effective solution for fault-tolerance.
[0046] Apart from the Shuffle Reduce tasks described herein, sample embodiments may include a Shuffle Reduce module that is included in the data
analytics framework driver. The data analytics framework driver is the master node in the data analytics application that splits a data analytics application into tasks and schedules the tasks to run on executors. The driver module may spawn tasks across multiple partitions. In sample embodiments, the driver module is provided on the client side to decide whether to invoke the Shuffle Reduce tasks described herein (how many, etc.) for a job or to maintain the existing execution plan. In sample embodiments, the driver module of a conventional data analytics framework driver is revised to determine when to invoke the Shuffle Reduce tasks by checking:
1. If there is a shuffle phase(s);
2. If the workload is large enough for the Shuffle Reduce method to reap benefits; and
3. If the reduce operations are known (or determined) to be commutative and to decrease the amount of data (output data size is much less compared to input data size).
When these conditions are met, the Shuffle Reduce tasks described herein are invoked.
[0047] A first alternative solution to the embodiment described above would divide the Shuffle Reduce phase into two different tasks (merge, preduce). This embodiment would have similar characteristics to the embodiment described above. All the operations before the merge operation would be carried out by the merge task and the rest would be carried out by the preduce task. The merge task will communicate with the preduce task directly and not via HDD.
[0048] The benefit of the first alternative solution is that a data analytics framework user/operator may scale up/down the merge and reduce operations in the shuffle phase independently, which provides more flexibility and better utilization of resources. The disadvantage is that this embodiment adds an extra layer of network communications since it gets rid of the locality aspect of the first embodiment.
[0049] A second alternative solution to the embodiments described above would use different storage mediums instead of HDD. HDD is typically preferred for huge amounts of data for cost purposes as HDD is cheaper than other solutions. However, since Shuffle Reduce tasks significantly decrease the amount of data handled by the reduce tasks, a possible solution would be to store the data in a more efficient medium (like Solid State Disk (SSD) or different persistent memory
technologies) in order to get better performance (since the cost of these solutions is now decreased for storing less data) for the Shuffle Reduce phase. The second alternative solution thus provides better I/O performance at the expense of total cost for the whole architecture. [0050] FIG. 8 illustrates an example of a Shuffle Reduce operation for the indicated input files in a sample embodiment. In this example, several input files 810 including different types of fruit (apple, blueberry, grape, melon, orange, mango) are merged at merge operation 820 to produce the respective files 830 and 840 that are stored in DRAM. A preduce task 850 performs a ReduceByKey (sum) operation on the respective files 830 and 840 to produce the respective output files 860 and 870. It will be appreciated by those skilled in the art that the ReduceByKey is a commutative operation and that the resulting files 860 and 870 are smaller than the files 830 and 840. As indicated, the output key value pair includes the fruit along with the number of occurrences in the merged input files 830 and 840. In this example, by combining duplicate elements into elements and element count key value pairs, the number of key value pairs has been reduced from 12 to 8, thus saving memory space. It will be appreciated that similar reductions in memory space may be achieved for other commutative reduce operations such as an AggregateByKey operation or a GroupByKey operation. [0051] The Shuffle Reduce phase decreases the HDD I/O overhead since it significantly reduces the number of random access I/O requests. The Shuffle Reduce task described herein also greatly reduces the data size that needs to be read by the reduce tasks. As a result, the I/O overhead along with the total execution time of shuffle phase are considerably decreased. [0052] The following table compares the performance of traditional data analytics frameworks including the Spark™ system, the Riffle™ system, and the Shuffle Reduce approach implemented herein.
Where:
M is the number of map tasks;
R is the number of reduce tasks;
N is the number of map tasks per merge or Shuffle Reduce task;
S is the total shuffle data size; and r is a reduce factor (0 < r < 1, depending on workload, operations, and N). In sample embodiments, r < 0.1 but varies by workload. In the worst case, r = 1 and the total read/write size is still at least as good as the read/write size for the Riffle™ system. [0053] FIG. 9 illustrates a general-purpose computer 900 suitable for implementing one or more embodiments of the methods disclosed herein. For example, the computer 900 in FIG. 9 may be implemented on a data analytics framework (e.g., Spark™, Hadoop™, TensorFlow™, and Dryad™) of the type described herein. The components described above may be implemented on any general-purpose network component, such as a computer 900 with sufficient processing power, memory resources, and network throughput capability to handle the necessary workload placed upon it. The computer 900 includes a processor 910 (which may be referred to as a central processor unit or CPU) that is in communication with memory devices including secondary storage 920, read only memory (ROM) 930, random access memory (RAM) 940, input/output (I/O) devices 950, and network connectivity devices 960. In sample embodiments, the network connectivity devices 960 further connect the processor 910 to a client side data analytics driver 970 that manages the data analytics process and determines when to invoke the Shuffle Reduce tasks as described herein. The processor 910 may be implemented as one or more CPU chips, or may be part of one or more application specific integrated circuits (ASICs).
[0054] The secondary storage 920 is typically comprised of one or more disk drives or tape drives and is used for non-volatile storage of data and as an over flow data storage device if RAM 940 is not large enough to hold all working data. Secondary storage 920 may be used to store programs that are loaded into RAM 940 when such programs are selected for execution. The ROM 930 is used to store instructions and perhaps data that are read during program execution. ROM 930 is a non-volatile memory device that typically has a small memory capacity relative to the larger memory capacity of secondary storage 920. The RAM 940 is used to store volatile data and perhaps to store
instructions. Access to both ROM 930 and RAM 940 is typically faster than to secondary storage 920.
[0055] It should be understood that computer 900 may execute instructions from computer-readable non-transitory media storing computer readable instructions and one or more processors coupled to the memory, and when executing the computer readable instructions, the computer 900 is configured to perform method steps and operations described in the disclosure with reference to FIG. 1 to FIG. 8. The computer-readable non-transitory media includes all types of computer readable media, including magnetic storage media, optical storage media, flash media and solid state storage media.
[0056] It should be further understood that software including one or more computer-executable instructions that facilitate processing and operations as described above with reference to any one or all of steps of the disclosure may be installed in and sold with one or more servers or databases. Alternatively, the software may be obtained and loaded into one or more servers or one or more databases in a manner consistent with the disclosure, including obtaining the software through physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator. The software may be stored on a server for distribution over the Internet, for example.
[0057] Also, it will be understood by one skilled in the art that this disclosure is not limited in its application to the details of construction and the arrangement of components set forth in the following description or illustrated in the drawings. The embodiments herein are capable of other embodiments, and capable of being practiced or carried out in various ways. Also, it will be understood that the phraseology and terminology used herein is for the purpose of description and should not be regarded as limiting. The use of "including," "comprising," or "having" and variations thereof herein is meant to encompass the items listed thereafter and equivalents thereof as well as additional items. [0058] The components of the illustrative devices, systems and methods employed in accordance with the illustrated embodiments may be implemented, at least in part, in digital electronic circuitry, analog electronic circuitry, or in computer hardware, firmware, software, or in combinations of them. These components also mayn be implemented, for example, as a computer program
product such as a computer program, program code or computer instructions tangibly embodied in an information carrier, or in a machine -readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
[0059] A computer program may be written in any form of programming language, including compiled or interpreted languages, and it may be deployed in any form, including as a stand-alone program or as a module, component, subroutine, or other unit suitable for use in a computing environment. A computer program may be deployed to be executed on one computer or on multiple computers at one site or distributed across multiple sites and interconnected by a communication network. Also, functional programs, codes, and code segments for accomplishing the systems and methods described herein may be easily construed as within the scope of the disclosure by programmers skilled in the art to which the present disclosure pertains. Method steps associated with the illustrative embodiments may be performed by one or more programmable processors executing a computer program, code or instructions to perform functions (e.g., by operating on input data and generating an output). Method steps may also be performed by, and apparatus may be implemented as, special purpose logic circuitry, e.g., an FPGA (field programmable gate array) or an ASIC (application-specific integrated circuit), for example.
[0060] The various illustrative logical blocks, modules, and circuits described in connection with the embodiments disclosed herein may be implemented or performed with a general purpose processor, a digital signal processor (DSP), an ASIC, a FPGA or other programmable logic device, discrete gate or transistor logic, discrete hardware components, or any combination thereof designed to perform the functions described herein. A general purpose processor may be a microprocessor, but in the alternative, the processor may be any conventional processor, controller, microcontroller, or state machine. A processor may also be implemented as a combination of computing devices, e.g., a combination of a DSP and a microprocessor, a plurality of microprocessors, one or more microprocessors in conjunction with a DSP core, or any other such configuration.
[0061] Processors suitable for the execution of a computer program include, by way of example, both general and special purpose microprocessors, and any
one or more processors of any kind of digital computer. Generally, a processor will receive instructions and data from a read-only memory or a random access memory or both. The essential elements of a computer are a processor for executing instructions and one or more memory devices for storing instructions and data. Generally, a computer will also include, or be operatively coupled to receive data from or transfer data to, or both, one or more mass storage devices for storing data, e.g., magnetic, magneto-optical disks, or optical disks. Information carriers suitable for embodying computer program instructions and data include all forms of non-volatile memory, including by way of example, semiconductor memory devices, e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks (e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, CD-ROM disks, or DVD-ROM disks). The processor and the memory may be supplemented by, or incorporated in special purpose logic circuitry.
[0062] Those of skill in the art understand that information and signals may be represented using any of a variety of different technologies and techniques. For example, data, instructions, commands, information, signals, bits, symbols, and chips that may be referenced throughout the above description may be represented by voltages, currents, electromagnetic waves, magnetic fields or particles, optical fields or particles, or any combination thereof.
[0063] Those skilled in the art may further appreciate that the various illustrative logical blocks, modules, circuits, and algorithm steps described in connection with the embodiments disclosed herein may be implemented as electronic hardware, computer software, or combinations of both. To clearly illustrate this interchangeability of hardware and software, various illustrative components, blocks, modules, circuits, and steps have been described above generally in terms of their functionality. Whether such functionality is implemented as hardware or software depends upon the particular application and design constraints imposed on the overall system. Skilled artisans may implement the described functionality in varying ways for each particular application, but such implementation decisions should not be interpreted as causing a departure from the scope of the present disclosure. A software module may reside in random access memory (RAM), flash memory, ROM, EPROM,
EEPROM, registers, hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art. A sample storage medium is coupled to the processor such the processor may read information from, and write information to, the storage medium. In the alternative, the storage medium may be integral to the processor. In other words, the processor and the storage medium may reside in an integrated circuit or be implemented as discrete components.
[0064] As used herein, “machine-readable medium” means a device able to store instructions and data temporarily or permanently and may include, but is not limited to, random-access memory (RAM), read-only memory (ROM), buffer memory, flash memory, optical media, magnetic media, cache memory, other types of storage (e.g., Erasable Programmable Read-Only Memory (EEPROM)), and any suitable combination thereof. The term “machine-readable medium” should be taken to include a single medium or multiple media (e.g., a centralized or distributed database, or associated caches and servers) able to store processor instructions. The term “machine-readable medium” shall also be taken to include any medium, or combination of multiple media, that is capable of storing instructions for execution by one or more processors, such that the instructions, when executed by one or more processors cause the one or more processors to perform any one or more of the methodologies described herein. Accordingly, a “machine-readable medium” refers to a single storage apparatus or device, as well as “cloud-based” storage systems or storage networks that include multiple storage apparatus or devices. The term “machine-readable medium” as used herein excludes signals per se.
[0065] Although a few embodiments have been described in detail above, other modifications are possible. For example, the logic flows depicted in the figures do not require the particular order shown, or sequential order, to achieve desirable results. Other steps may be provided, or steps may be eliminated, from the described flows, and other components may be added to, or removed from, the described systems. Other embodiments may be within the scope of the following claims.
Claims
1. A method of performing a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during a data analytics process, comprising:
(1) receiving as input at least two input files from a first memory, where each input file has been sorted and written by a different map task;
(2) fetching batches of data from each input file;
(3) merging and sorting the batches of data in a second memory to form a unified piece of data;
(4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data;
(5) writing the output data to a third memory; and
(6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
2. A method as in claim 1, wherein the first memory and the third memory comprise hard disk drives and the second memory comprises dynamic random access memory (DRAM).
3. A method as in claim 1, wherein the first memory comprises at least one hard disk drive and the third memory comprises at least one of a solid state disk and a persistent memory.
4. A method as in claim 1, wherein fetching the batches of data from each input file comprises fetching a total amount of data that does not exceed a memory capacity allocated for the Shuffle Reduce operation.
5. A method as in claim 1, further comprising a driver module determining whether to perform the Shuffle Reduce operation for a particular job in the data
analytics process based on: whether the job includes a Shuffle operation, whether a workload for the job is large enough to reap performance gains, whether the job includes the commutative reduce operation that provides an amount of output data in the output files that is less than an amount of input data, how many tasks must be started to implement the Shuffle Reduce operation, and when the Shuffle Reduce operation is to be started and stopped.
6. A method as in claim 1, wherein the data analytics process is implemented on a data analytics platform and applying the Shuffle Reduce operation comprises applying as the commutative reduce operation at least one of an aggregate by key operation, a group by key operation, and a reduce by key operation.
7. A method as in claim 1, further comprising performing steps (l)-(3) by a first task and steps (4)-(5) by a second task, wherein the first task and the second task communicate directly with each other independent of the first memory and the third memory.
8. A method as in claim 1, wherein applying the Shuffle Reduce operation to the unified piece of data to produce output data comprises applying a plurality of tasks to implement the Shuffle Reduce operation.
9. A data analytics system comprising: at least one processor; a first memory that stores at least two input files, each input file being sorted and written by a different map task of a data analytics process; a second memory; a third memory; and an instruction memory that stores instructions that upon execution by the at least one processor performs a data analytics process including a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during the data analytics process, the Shuffle Reduce operation comprising:
(1) receiving as input the at least two input files from the first memory;
(2) fetching batches of data from each input file;
(3) merging and sorting the batches of data in the second memory to form a unified piece of data;
(4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data;
(5) writing the output data to the third memory; and
(6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
10. A system as in claim 9, wherein the first memory and the third memory comprise hard disk drives and the second memory comprises dynamic random access memory (DRAM).
11. A system as in claim 9, wherein the first memory comprises at least one hard disk drive and the third memory comprises at least one of a solid state disk and a persistent memory.
12. A system as in claim 9, wherein a total amount of data fetched from each input file does not exceed a memory capacity allocated by the data analytics process for the Shuffle Reduce operation.
13. A system as in claim 9, further comprising a driver module that determines whether to perform the Shuffle Reduce operation for a particular job in the data analytics process based on: whether the job includes a Shuffle operation, whether a workload for the job is large enough to reap performance gains, whether the job includes the commutative reduce operation that provides an amount of output data in the output files that is less than an amount of input data, how many tasks must be started to implement the Shuffle Reduce operation, and when the Shuffle Reduce operation is to be started and stopped.
14. A system as in claim 9, further comprising a data analytics platform that implements the data analytics process and applies the Shuffle Reduce operation by
applying as the commutative reduce operation at least one of an aggregate by key operation, a group by key operation, and a reduce by key operation.
15. A system as in claim 9, further comprising a First task that performs steps
(l)-(3) and a second task that performs steps (4)-(5), wherein the first task and the second task communicate directly with each other independent of the first memory and the third memory.
16. A system as in claim 9, wherein the Shuffle Reduce operation comprises a plurality of tasks that are applied to the unified piece of data to produce output data.
17. A computer readable storage medium comprising instructions that upon execution by at least one processor cause the processor to perform a Shuffle Reduce operation that groups and joins data between map and reduce stages for data transformations during a data analytics process by performing operations comprising:
(1) receiving as input at least two input files from a first memory, where each input file has been sorted and written by a different map task;
(2) fetching batches of data from each input file;
(3) merging and sorting the batches of data in a second memory to form a unified piece of data;
(4) applying a Shuffle Reduce operation to the unified piece of data to produce output data, where the Shuffle Reduce operation comprises a commutative reduce operation that provides an amount of output data that is less than an amount of input data;
(5) writing the output data to a third memory; and
(6) repeating (2)-(5) until data from each input file is entirely consumed and analytics data output of the data analytics process has been fully formed.
18. A medium as in claim 17, wherein the data analytics process is implemented on a data analytics platform, further comprising instructions that when executed by the at least one processor cause the processor to perform operations comprising applying as the commutative reduce operation at least one
of an aggregate by key operation, a group by key operation, and a reduce by key operation.
19. A medium as in claim 17, further comprising instructions that when executed by the at least one processor cause the processor to perform steps (l)-(3) by a first task and steps (4)-(5) by a second task, wherein the first task and the second task communicate directly with each other independent of the first memory and the third memory.
20. A medium as in claim 17, further comprising instructions that when executed by the at least one processor cause the processor to perform a plurality of tasks to implement the Shuffle Reduce operation.
Priority Applications (2)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202080092843.2A CN114945902B (en) | 2020-01-15 | 2020-01-15 | Method, system and storage medium for performing shuffle-reduce operations |
| PCT/US2020/013686 WO2021061183A1 (en) | 2020-01-15 | 2020-01-15 | Shuffle reduce tasks to reduce i/o overhead |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| PCT/US2020/013686 WO2021061183A1 (en) | 2020-01-15 | 2020-01-15 | Shuffle reduce tasks to reduce i/o overhead |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| WO2021061183A1 true WO2021061183A1 (en) | 2021-04-01 |
Family
ID=69500883
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| PCT/US2020/013686 Ceased WO2021061183A1 (en) | 2020-01-15 | 2020-01-15 | Shuffle reduce tasks to reduce i/o overhead |
Country Status (2)
| Country | Link |
|---|---|
| CN (1) | CN114945902B (en) |
| WO (1) | WO2021061183A1 (en) |
Cited By (1)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN113743582A (en) * | 2021-08-06 | 2021-12-03 | 北京邮电大学 | A novel channel shuffling method and device based on stack shuffling |
Citations (6)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20120297145A1 (en) * | 2011-05-20 | 2012-11-22 | Claris Castillo | System and method to improve i/o performance of data analytic workloads |
| US20130167151A1 (en) * | 2011-12-22 | 2013-06-27 | Abhishek Verma | Job scheduling based on map stage and reduce stage duration |
| US20140358977A1 (en) * | 2013-06-03 | 2014-12-04 | Zettaset, Inc. | Management of Intermediate Data Spills during the Shuffle Phase of a Map-Reduce Job |
| US20150150017A1 (en) * | 2013-11-26 | 2015-05-28 | International Business Machines Corporation | Optimization of map-reduce shuffle performance through shuffler i/o pipeline actions and planning |
| US9170848B1 (en) * | 2010-07-27 | 2015-10-27 | Google Inc. | Parallel processing of data |
| US20160034205A1 (en) * | 2014-08-01 | 2016-02-04 | Software Ag Usa, Inc. | Systems and/or methods for leveraging in-memory storage in connection with the shuffle phase of mapreduce |
Family Cites Families (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20150127649A1 (en) * | 2013-11-01 | 2015-05-07 | Cognitive Electronics, Inc. | Efficient implementations for mapreduce systems |
| EP3376399A4 (en) * | 2015-12-31 | 2018-12-19 | Huawei Technologies Co., Ltd. | Data processing method, apparatus and system |
-
2020
- 2020-01-15 CN CN202080092843.2A patent/CN114945902B/en active Active
- 2020-01-15 WO PCT/US2020/013686 patent/WO2021061183A1/en not_active Ceased
Patent Citations (6)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US9170848B1 (en) * | 2010-07-27 | 2015-10-27 | Google Inc. | Parallel processing of data |
| US20120297145A1 (en) * | 2011-05-20 | 2012-11-22 | Claris Castillo | System and method to improve i/o performance of data analytic workloads |
| US20130167151A1 (en) * | 2011-12-22 | 2013-06-27 | Abhishek Verma | Job scheduling based on map stage and reduce stage duration |
| US20140358977A1 (en) * | 2013-06-03 | 2014-12-04 | Zettaset, Inc. | Management of Intermediate Data Spills during the Shuffle Phase of a Map-Reduce Job |
| US20150150017A1 (en) * | 2013-11-26 | 2015-05-28 | International Business Machines Corporation | Optimization of map-reduce shuffle performance through shuffler i/o pipeline actions and planning |
| US20160034205A1 (en) * | 2014-08-01 | 2016-02-04 | Software Ag Usa, Inc. | Systems and/or methods for leveraging in-memory storage in connection with the shuffle phase of mapreduce |
Non-Patent Citations (2)
| Title |
|---|
| JINGJING ZHANG, OSVALDO SIMEONE: "Improved Latency-Communication Trade-Off for Map-Shuffle-Reduce Systems with Stragglers", ARXIV.ORG, 20 August 2018 (2018-08-20), XP081258763 * |
| ZHANG ET AL.: "Proceedings of the Thirteenth EuroSys Conference", 23 April 2018, ACM, article "Riffle: Optimized Shuffle Service for Large-Scale Data Analytics", pages: 43 |
Cited By (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN113743582A (en) * | 2021-08-06 | 2021-12-03 | 北京邮电大学 | A novel channel shuffling method and device based on stack shuffling |
| CN113743582B (en) * | 2021-08-06 | 2023-11-17 | 北京邮电大学 | Novel channel shuffling method and device based on stack shuffling |
Also Published As
| Publication number | Publication date |
|---|---|
| CN114945902B (en) | 2025-03-14 |
| CN114945902A (en) | 2022-08-26 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| Kolb et al. | Multi-pass sorted neighborhood blocking with mapreduce | |
| Murray et al. | {CIEL}: A universal execution engine for distributed {Data-Flow} computing | |
| Kwon et al. | A study of skew in mapreduce applications | |
| US9152601B2 (en) | Power-efficient nested map-reduce execution on a cloud of heterogeneous accelerated processing units | |
| US11107187B2 (en) | Graph upscaling method for preserving graph properties | |
| Elsayed et al. | Mapreduce: State-of-the-art and research directions | |
| Gu et al. | Improving execution concurrency of large-scale matrix multiplication on distributed data-parallel platforms | |
| Bhatotia | Incremental parallel and distributed systems | |
| Jiang et al. | Parallel K-Medoids clustering algorithm based on Hadoop | |
| Lai et al. | Accelerating multi-way joins on the GPU | |
| Han et al. | Distme: A fast and elastic distributed matrix computation engine using gpus | |
| Premchaiswadi et al. | Optimizing and tuning MapReduce jobs to improve the large‐scale data analysis process | |
| Wan et al. | GPU implementation of a parallel two‐list algorithm for the subset‐sum problem | |
| Sun et al. | GraphMP: An efficient semi-external-memory big graph processing system on a single machine | |
| Hsieh et al. | SQLMR: A scalable database management system for cloud computing | |
| CN108710640A (en) | A method of improving the search efficiency of Spark SQL | |
| Shirahata et al. | A scalable implementation of a mapreduce-based graph processing algorithm for large-scale heterogeneous supercomputers | |
| WO2021236171A1 (en) | Partial sort phase to reduce i/o overhead | |
| WO2021061183A1 (en) | Shuffle reduce tasks to reduce i/o overhead | |
| Nguyen et al. | GPU-accelerated VoltDB: A case for indexed nested loop join | |
| Triaji et al. | Query Execution Performance Analysis of Column-Oriented Database in Dashboard | |
| Zhao et al. | Divide‐and‐conquer approach for solving singular value decomposition based on MapReduce | |
| Rogala et al. | Datalogra: datalog with recursive aggregation in the spark RDD model | |
| Noll et al. | Shared Load (ing): Efficient Bulk Loading into Optimized Storage. | |
| CN114185970A (en) | Parallel association rule mining method based on independent probability complete weighting |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| 121 | Ep: the epo has been informed by wipo that ep was designated in this application |
Ref document number: 20704171 Country of ref document: EP Kind code of ref document: A1 |
|
| NENP | Non-entry into the national phase |
Ref country code: DE |
|
| 122 | Ep: pct application non-entry in european phase |
Ref document number: 20704171 Country of ref document: EP Kind code of ref document: A1 |
|
| WWG | Wipo information: grant in national office |
Ref document number: 202080092843.2 Country of ref document: CN |