WO2017019001A1 - Distributed datasets in shared non-volatile memory - Google Patents
Distributed datasets in shared non-volatile memory Download PDFInfo
- Publication number
- WO2017019001A1 WO2017019001A1 PCT/US2015/042134 US2015042134W WO2017019001A1 WO 2017019001 A1 WO2017019001 A1 WO 2017019001A1 US 2015042134 W US2015042134 W US 2015042134W WO 2017019001 A1 WO2017019001 A1 WO 2017019001A1
- Authority
- WO
- WIPO (PCT)
- Prior art keywords
- language
- transformation
- schema
- wrapper
- volatile memory
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Ceased
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
Definitions
- Recent work in distributed dataset computing includes a dataflow engine known as Spark (see “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing," NSDI 2012, by Matei Zaharia et al.). Spark allows data transformations directly in memory (i.e., DRAM) of multiple Spark nodes. Resilient Distributed Datasets (RDDs) are collections of data partitioned across the memory of each node of a cluster.
- Figure 1 illustrates an example computing device that may utilize shared non-volatile memory for cluster computing with distributed datasets
- Figure 2 illustrates an example flow diagram for an example process for cluster computing with distributed datasets using the computing device of Figure 1.
- Figure 3 illustrates a block diagram of an example system with a computer-readable storage medium including instructions executable by a processor to forming distributed datasets in a non-volatile virtual memory.
- RDDs Resilient Distributed Datasets
- RDDs are collections of data partitioned across the memory of each node of a cluster.
- RDDs enable programmers to perform in-memory computations in the aggregated memory of the cluster in a fault-tolerant manner.
- RDDs support updates through coarse grain transformations such as map, reduce, and filter.
- Each RDD tracks its lineage, which represents the sequence of transformations used to derive the RDD. The deterministic nature of these transformations enables scalable fault tolerance.
- Spark implements RDDs as a distributed collection of Scala/Java objects, referred to from herein as Scala objects.
- the Scala objects are partitioned across multiple partitions and each partition comprises multiple Scala objects.
- the Scala objects live in memory managed by a Java Virtual Machine (JVM) on the compute nodes in the cluster, also known as a Java Heap.
- JVM Java Virtual Machine
- RDDs are constructed by applying transformations on other RDDs (e.g., map, filter,
- Scala objects comprising RDDs live in the Java Heap, so they have to be garbage collected (e.g., when the RDDs are out of the computing scope in the use-defined application program, or purged out of the cache managed by Spark runtime), which can cause high pressure on the garbage collector of the Spark nodes. Since the RDD lives in volatile DRAM memory, when RDDs are lost on the event of a crash, they have to be recomputed upon recovery, which can cause long execution times for recovery.
- Systems and methods described herein provide for storing, processing, and analyzing large datasets.
- the systems and methods described herein may utilize in- situ distributed datasets (e.g., RDDs as used in Spark) that remain in a common shared non- volatile memory coupled to each of a plurality of compute nodes that implement a distributed dataset system such as, for example Spark.
- RDDs distributed dataset system
- the use of in-situ distributed datasets enables direct in-place data manipulation of the distributed datasets stored in the non- volatile memory without first converting them into objects native to the managed language of the compute nodes (e.g., Java or Scala as used in Spark).
- This methodology helps avoid compute node overhead for serializing/deserializing distributed datasets, and it also reduces the number of objects managed by the compute nodes, thus reducing the pressure on the garbage collector component of the native language. It also enables the compute nodes to efficiently handle larger datasets stored in the non-volatile memory rather than datasets that fit in the smaller aggregated DRAM of the compute node cluster. Since objects are stored in unserialized byte form, transformations may be written in a different more- efficient language like C++ or even be implemented in hardware.
- FIG. 1 illustrates an example computing device 100 that may utilize shared non- volatile memory for cluster computing with distributed datasets.
- the computing device 100 includes a plurality of compute nodes 110, four compute nodes, in this example, including first, second, third and fourth compute nodes 110-1, 110-2, 110-3 and 110-4, respectively.
- the compute nodes 110 may each include a set of processing cores, caches and various ancillary peripherals.
- the compute nodes 110 are each respectively coupled to local node memory 120-1, 120-2, 120-3 and 120-4.
- the local memories 120 may be DRAM, RAM, ROM etc.
- the local memories may include API functions in a native language of the compute nodes, e.g., Scala and Java in the case of Spark nodes as described above.
- the compute nodes 110 are communicatively coupled to a distributed dataset manager device 130.
- the distributed dataset manager 130 includes a distributed dataset (DDS) manager processor 140 such as a central processing unit (CPU).
- the distributed dataset manager 130 also includes a shared non- volatile memory 150 that stores partitions of distributed datasets or DDS 155, e.g., RDDs as used in Spark.
- the DDS partitions 155 include first, second, third and fourth DDS partitions 155-1, 155-2, 155-3 and 155-4, each coupled respectively to the first, second, third and fourth compute nodes 110-1, 110-2, 110-3 and 110-4.
- the shared non- volatile memory 150 may be a byte addressable persistent memory and may be accessed directly, by the compute nodes 110, via normal CPU load/store instructions in units as small as one byte.
- the shared non-volatile memory 150 may be a resistive type of memory.
- the shared non- volatile memory 150 may serve as both memory and persistent storage.
- Storing the DDS partitions 155 in the shared non-volatile memory 150 provides many advantages in contrast to the RDDs of Spark, which are stored in DRAM of individual Spark nodes when RDDs are applied with transformation.
- the RDDs of Spark rely heavily on serialization when re-distributing partitions between cluster nodes to support operations that require shuffling data between nodes, operations such as reduceBy, groupBy, and join.
- serialization is necessary in a cluster of machines communicating using TCP/IP, this becomes unnecessary in the context where the compute nodes 110 can exchange the DDS partitions 155 through the shared non- volatile memory 150.
- the in-situ DDS partitions 155 provide for direct in-place data manipulation by the compute nodes 110 without first converting them into objects native to the managed language of the compute nodes 110. This helps avoid the CPU overhead of serializing/deserializing the DDS partitions 155 and also reduces the number of objects managed by the JVM heap, thus reducing the pressure on the JVM garbage collector. [0015]
- the in-situ DDS partitions 155 enable faster recovery of RDDs. Non- volatile memory has better recovery behavior than DRAM. Once a DDS including all the DDS partitions 155 is created and stored in the non- volatile memory 150, it is immutable. If one of the compute nodes fails with immutable DDS partitions 155 in the shared non- volatile memory 150, then the DDS partitions 155 do not need to be recomputed using the transformation lineage.
- the DDS manager processor 140 In response to receiving commands from the compute nodes 110 in the native language of the compute nodes (e.g., Scala / Java), the DDS manager processor 140 provides various stub functions using a wrapper / schema module 160 and a transform module 170.
- the wrapper / schema module 160 and the transform module 170 may be implemented in software, firmware and/or hardware.
- the in-situ DDS partitions 155 are stored in shared non- volatile memory 150 following a schema provided by the wrapper / schema module 160.
- the schema enables representing the DDS partitions using a language independent layout, thus enabling components written in different (managed or/and unmanaged) languages to create, access, and process the in-situ DDS partitions 155 shared between them. This enables an engine like Spark to offload computation from the Scala native language to more efficient languages like C++ for performing critical steps, including, for example, shuffling operations or entire transformations such as filtering, sampling, and indexing implemented entirely in C++.
- the wrapper/schema module 160 and the transform module 170 may use unsafe Java APIs to access the underlying objects stored in the in-situ DDS partitions 155 all at once. These objects and their associated schema may be automatically generated using a stub compiler that takes as input the native language objects of the compute nodes 110 (e.g., Scala objects of Spark nodes). The wrapper objects are then used by the wrapper iterators of the wrapper / schema module 160 to iterate over the elements of the in-situ DDS partitions 155 directly without generating a native language representation of the DDS in the JVM.
- a stub compiler that takes as input the native language objects of the compute nodes 110 (e.g., Scala objects of Spark nodes).
- the wrapper objects are then used by the wrapper iterators of the wrapper / schema module 160 to iterate over the elements of the in-situ DDS partitions 155 directly without generating a native language representation of the DDS in the JVM.
- the DDS manager processor 140 manages DDSs stored in shared non-volatile memory 150.
- An entire in-situ DDS stored in the shared non- volatile memory 150 includes the DDS partitions 155, dependencies on previous in-situ DDSs, and Scala function closures implementing transformations on the dependent DDSs.
- the Scala function closures may be in the form of stub functions provided by the transform module 170.
- the stub functions interpret the unserialized bytes of the DDS partitions 155 as Native Java Objects using a schema provided by the wrapper / schema module 160.
- an in-situ DDS stores a schema, as provided by the wrapper / schema module 160 that defines the fields and layout of the in-situ DDS, and potentially the function closure expressed in different languages such as C++.
- An iterator is an abstraction over a collection of objects that share the same object type and undergo the same transformation.
- each processing step performs a
- the compute nodes 110 access the in-situ DDS partitions 155 through wrapper iterators provided by the wrapper / schema module 160.
- Wrapper iterators provided by the wrapper / schema module 160 may implement RDD APIs and enable seamless integration of in-situ DDS partitions 155 with all of the compute nodes 110.
- the wrapper iterators rely on the associated DDS schema to access the fields of the underlying in-situ DDS partitions 155 using, in one example, Java Unsafe APIs.
- the in-situ DDSs stored in the nonvolatile memory 150 enable direct in-place data manipulation without first converting them into objects native to the managed language (e.g. Scala, Java).
- In-place data manipulation has several benefits. First, it can reduce the overhead of serialization/deserialization, and it also reduces the number of objects managed by the JVM runtime, thus reducing the pressure on the JAVA garbage collector. Second, it enables Spark to efficiently handle larger datasets stored in the non- volatile memory 150 rather than datasets that fit in the smaller aggregated DRAM of the Spark nodes.
- Figure 2 illustrates an example flow diagram for an example process 200 for cluster computing with distributed datasets using the computing device 100 of Figure 1.
- the process 200 is exemplary only and may be modified.
- the example process 200 of Figure 2 will now be described with further references to Figure 1.
- the process 200 may begin with the DDS manager processor 140 receiving a command to apply a transformation to a partitioned distributed dataset stored in the shared nonvolatile memory 150. Each partition of the distributed dataset is coupled to one of a plurality of the compute nodes 110 coupled to the shared non- volatile memory.
- the command is in a first language native to the compute nodes 110, block 210.
- the received command may be received from a programmer terminal communicatively coupled to the computing device 100.
- the received command may be, in various examples, in Scala language.
- the DDS manager processor 140 Upon receiving the command at block 210, the DDS manager processor 140 causes the wrapper / schema module 160 to construct a wrapper iterator object within the shared nonvolatile memory 150 using a second language different than the native language and based on a schema stored in association with the distributed dataset, block 220.
- the native language of the compute nodes 110 is based on Java and a JVM.
- the second language or languages used by the wrapper / schema module may include C++.
- the distributed datasets are defined with one of several selected schemas.
- the select schema are described in a domain-specific language that is independent of programming languages such as Java, Scala or C++ and provide basic types such as integers, strings, floating numbers, etc.
- the schema language utilized at block 220 allows definition of complex types comprising an ordered list of the basic types provided by the domain-specific language of the schema.
- the schema may use constructs similar to C structs such as, for example, LIST[T1, T2].
- LIST[X: STRING, Y: INT] defines a new complex type as a list of tuples, each of which has two named variables X and Y, where X is defined as a string and Y is defined as an integer.
- the schema language may provide for the definition of two kinds of in- situ DDSs of basic or complex types, as follows:
- This language independent declaration of types enables a schema compiler to generate a language independent binary format to represent the DDSs contained within the DDS partitions 155.
- the schema employed at block 220 defines a binary byte sequence format for each basic and complex type and in-situ DDS partitions 155.
- the schema may represent basic types using a binary machine representation. For example INT may be a 64- bit (8 byte) sequence.
- the schema may represent complex types by concatenating the byte sequence of the basic types.
- a complex type may comprise the following schema:
- the binary format of the in-situ DDS comprises all the elements of all the DDS partitions 155, with each partition being constructed by concatenating the byte sequences of each element, with the addition of a byte sequence EOF, or other known byte sequence, at the end.
- the EOF byte sequence marks the termination of the DDS partition.
- the binary representation of an associative in-situ DDS partition of string-integer key-value pairs given in equation (1) above (MAP_IN_SITU_RDD[(X: STRING, Y: INT)]) with two elements would be:
- N and M are the string lengths of the first and second elements respectively
- a stub compiler in the wrapper / schema module 160 takes as input the generated schema of the in-situ DDS selected at block 210 and generates a wrapper iterator object in a second language (e.g., C++) that is different that the native language of the compute nodes 110 (e.g., Java or Scala) for iterating through all the elements of the in-situ DDS including all the DDS partitions 155.
- the wrapper iterator object knows how to interpret the underlying language-independent binary format of the in-situ DDS.
- a single wrapper iterator object (e.g., Java or Scala) is used to access each element of the entire in-situ DDS including all the DDS partitions 155, rather than having multiple Java or Scala objects each representing one element in Spark.
- the wrapper iterator constructed at block 220 leverages the property that transformations on the in-situ DDSs stored in the shared non- volatile memory 150 are done on all elements of the in-situ DDS for all the DDS partitions 155 and that all elements of the in-situ DDS have the same type. For this reason, the wrapper / schema module 160 interprets the schema for the DDS selected at block 210 once and not each time it accesses a DDS element. This in effect reduces the CPU time spent in interpreting types.
- the compute nodes 110 apply the selected transformation received at block 210 to the wrapper iterator object constructed at block 220 to form a resulting distributed dataset in the shared non-volatile memory 150.
- the resulting distributed dataset is formed and stored directly in the shared nonvolatile memory 150.
- the DDS manager processor 140 causes the wrapper / schema iterator module 160 to store the resulting distributed dataset in a format similar to the original DDS using the same schema or a different schema, depending on the transformation.
- the transformation applied at block 230 may be a transformation object in the native language of the compute nodes 110.
- the transformation applied at block 230 may be a transformation object in a language other than the native language of the compute nodes 110, as described above.
- the DDS manager processor may cause the transform module 170 to construct the selected transformation object in at least one of the second language of the wrapper iterator object or a third language different than the native language.
- the compute nodes 110 would then apply the constructed transformation object to the wrapper iterator object to form the resulting distributed dataset in the shared non- volatile memory 150.
- the transformation applied at block 230 may include one or more of a filter transformation, a flat map transformation, a map transformation, a reduceByKey transformation, etc.
- a reduceByKey transformation shuffles key-value pairs to reducers where pairs with the same key end up at the same reducer. For example, in reduceByKey ( f ⁇ x + y) each reducer sums up the values of pairs with same word key.
- a flatMap transformation splits words and a map transformation maps each word to a key- value pair such as (word, 1).
- wrapper iterator object constructed at block 220 is thus reused for interpreting each element of the DDS partitions 155 rather than reconstructed for each element.
- the wrapper iterator object interprets the elements of the DDS partitions 155 as follows:
- Y2 INTERPRET INTEGER(RDD.BYTE SEQUENCE[N+1] ... (6) RDD .B YTE SEQUENCE [N+8 ] )
- Y2 INTERPRET_INTEGER(RDD .B YTE SEQUENCE [M+N+ 10] ... (8) RDD.BYTE_SEQUENCE[M+N+17]) [0039] Where INTERPRET_STRING(LEN, CHARS) interprets a byte array CHARS of length LEN as a STRING and INTERPRET INTEGER(BYTES) interprets a byte array BYTES as a 64-bit INT.
- each of the compute nodes 110 determines if the last element of the DDS partition 155 has been reached. If the last element has not been reached, application of the transformation at block 230 continues. If the last element has been reached, the process 200 continues to block 250 where the compute nodes 110 release the wrapper iterator object.
- the DDS manager processor 140 causes the wrapper / schema iterator module 160 to store the resulting distributed dataset in a format similar to the original DDS using the same schema or a different schema, depending on the transformation. When the resulting distributed dataset has been stored, the process 200 concludes.
- Figure 3 illustrates a block diagram of an example system with a computer-readable storage medium including instructions executable by a processor to forming distributed datasets in a non- volatile virtual memory.
- the system 300 includes the processor 310 and the computer- readable storage medium 320.
- the computer-readable storage medium 320 includes example instructions 321-323 executable by the processor 310 to perform various functionalities described herein.
- the example instructions includes receiving command to apply a transformation instructions 321.
- the instructions 321 cause the processor 310 to receive a command to apply a transformation to a partitioned distributed dataset stored in non-volatile memory.
- Each partition may be coupled to one of a plurality of compute nodes coupled to the non-volatile memory.
- the command may be in a first language native to the compute nodes.
- the example instructions 322 cause the processor 310 to construct a wrapper iterator object within the non- volatile memory using a second language different than the native language and based on a schema stored in association with the distributed dataset.
- the example instructions further include applying the transformation to the wrapper instructions 323.
- the example instructions 323 cause the processor 310 to apply the transformation to the wrapper iterator object to form a resulting distributed dataset in the non- volatile memory.
- Various examples described herein are described in the general context of method steps or processes, which may be implemented in one example by a software program product or component, embodied in a machine-readable medium, including executable instructions, such as program code, executed by entities in networked environments.
- program modules may include routines, programs, objects, components, data structures, etc. which may be designed to perform particular tasks or implement particular abstract data types.
- Executable instructions, associated data structures, and program modules represent examples of program code for executing steps of the methods disclosed herein.
- the particular sequence of such executable instructions or associated data structures represents examples of corresponding acts for implementing the functions described in such steps or processes.
Landscapes
- Engineering & Computer Science (AREA)
- Databases & Information Systems (AREA)
- Theoretical Computer Science (AREA)
- Computing Systems (AREA)
- Data Mining & Analysis (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
An example device includes a plurality of compute nodes; a non-volatile memory coupled to the plurality of compute nodes and storing a partitioned distributed dataset, each partition coupled to one the plurality of compute nodes; and a processor to construct a wrapper iterator object within the non-volatile memory based on a schema stored in association with the distributed dataset. The compute nodes apply a transformation to the wrapper iterator object to form a resulting distributed dataset in the non-volatile memory.
Description
DISTRIBUTED DATASETS IN SHARED NON-VOLATILE MEMORY
BACKGROUND
[0001] Recent work in distributed dataset computing includes a dataflow engine known as Spark (see "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing," NSDI 2012, by Matei Zaharia et al.). Spark allows data transformations directly in memory (i.e., DRAM) of multiple Spark nodes. Resilient Distributed Datasets (RDDs) are collections of data partitioned across the memory of each node of a cluster.
BRIEF DESCRIPTION OF THE DRAWINGS
[0002] For a more complete understanding of various examples, reference is now made to the following description taken in connection with the accompanying drawings in which:
[0003] Figure 1 illustrates an example computing device that may utilize shared non-volatile memory for cluster computing with distributed datasets; and
[0004] Figure 2 illustrates an example flow diagram for an example process for cluster computing with distributed datasets using the computing device of Figure 1.
[0005] Figure 3 illustrates a block diagram of an example system with a computer-readable storage medium including instructions executable by a processor to forming distributed datasets in a non-volatile virtual memory.
DETAILED DESCRIPTION
[0006] Spark introduced a new distributed memory abstraction called Resilient Distributed Datasets (RDDs). RDDs are collections of data partitioned across the memory of each node of a cluster. RDDs enable programmers to perform in-memory computations in the aggregated memory of the cluster in a fault-tolerant manner. In contrast to distributed shared memory that supports fine-grain updates to shared state, RDDs support updates through coarse grain transformations such as map, reduce, and filter. Each RDD tracks its lineage, which represents the sequence of transformations used to derive the RDD. The deterministic nature of these transformations enables scalable fault tolerance. If a partition of an RDD is lost then the partition can be recovered by re-computing its parent RDD partitions based on the captured RDD transformation lineage.
[0007] Spark implements RDDs as a distributed collection of Scala/Java objects, referred to from herein as Scala objects. The Scala objects are partitioned across multiple partitions and each partition comprises multiple Scala objects. The Scala objects live in memory managed by a Java Virtual Machine (JVM) on the compute nodes in the cluster, also known as a Java Heap. RDDs are constructed by applying transformations on other RDDs (e.g., map, filter,
reduceByKey, etc.). What that means is that Spark nodes iterate through each Scala object of the RDD and apply the transformations to construct the new Scala objects of the new RDD.
[0008] There are two problems with this approach that the systems and methods described herein solve. Scala objects comprising RDDs live in the Java Heap, so they have to be garbage collected (e.g., when the RDDs are out of the computing scope in the use-defined application program, or purged out of the cache managed by Spark runtime), which can cause high pressure on the garbage collector of the Spark nodes. Since the RDD lives in volatile DRAM memory, when RDDs are lost on the event of a crash, they have to be recomputed upon recovery, which can cause long execution times for recovery.
[0009] Systems and methods described herein provide for storing, processing, and analyzing large datasets. The systems and methods described herein may utilize in- situ distributed datasets (e.g., RDDs as used in Spark) that remain in a common shared non- volatile memory coupled to each of a plurality of compute nodes that implement a distributed dataset system such as, for example Spark. The use of in-situ distributed datasets enables direct in-place data manipulation of the distributed datasets stored in the non- volatile memory without first converting them into objects native to the managed language of the compute nodes (e.g., Java or Scala as used in Spark). This methodology helps avoid compute node overhead for serializing/deserializing distributed datasets, and it also reduces the number of objects managed by the compute nodes, thus reducing the pressure on the garbage collector component of the native language. It also enables the compute nodes to efficiently handle larger datasets stored in the non-volatile memory rather than datasets that fit in the smaller aggregated DRAM of the compute node cluster. Since objects are stored in unserialized byte form, transformations may be written in a different more- efficient language like C++ or even be implemented in hardware.
[0010] Referring now to the figures, Figure 1 illustrates an example computing device 100 that may utilize shared non- volatile memory for cluster computing with distributed datasets. The computing device 100 includes a plurality of compute nodes 110, four compute nodes, in this
example, including first, second, third and fourth compute nodes 110-1, 110-2, 110-3 and 110-4, respectively. The compute nodes 110 may each include a set of processing cores, caches and various ancillary peripherals. The compute nodes 110 are each respectively coupled to local node memory 120-1, 120-2, 120-3 and 120-4. The local memories 120 may be DRAM, RAM, ROM etc. The local memories may include API functions in a native language of the compute nodes, e.g., Scala and Java in the case of Spark nodes as described above.
[0011] The compute nodes 110 are communicatively coupled to a distributed dataset manager device 130. The distributed dataset manager 130 includes a distributed dataset (DDS) manager processor 140 such as a central processing unit (CPU). The distributed dataset manager 130 also includes a shared non- volatile memory 150 that stores partitions of distributed datasets or DDS 155, e.g., RDDs as used in Spark. The DDS partitions 155 include first, second, third and fourth DDS partitions 155-1, 155-2, 155-3 and 155-4, each coupled respectively to the first, second, third and fourth compute nodes 110-1, 110-2, 110-3 and 110-4.
[0012] The shared non- volatile memory 150 may be a byte addressable persistent memory and may be accessed directly, by the compute nodes 110, via normal CPU load/store instructions in units as small as one byte. The shared non-volatile memory 150 may be a resistive type of memory. The shared non- volatile memory 150 may serve as both memory and persistent storage.
[0013] Storing the DDS partitions 155 in the shared non-volatile memory 150 provides many advantages in contrast to the RDDs of Spark, which are stored in DRAM of individual Spark nodes when RDDs are applied with transformation. The RDDs of Spark rely heavily on serialization when re-distributing partitions between cluster nodes to support operations that require shuffling data between nodes, operations such as reduceBy, groupBy, and join. In contrast to the RDDs of Spark, where serialization is necessary in a cluster of machines communicating using TCP/IP, this becomes unnecessary in the context where the compute nodes 110 can exchange the DDS partitions 155 through the shared non- volatile memory 150.
[0014] The in-situ DDS partitions 155 provide for direct in-place data manipulation by the compute nodes 110 without first converting them into objects native to the managed language of the compute nodes 110. This helps avoid the CPU overhead of serializing/deserializing the DDS partitions 155 and also reduces the number of objects managed by the JVM heap, thus reducing the pressure on the JVM garbage collector.
[0015] The in-situ DDS partitions 155 enable faster recovery of RDDs. Non- volatile memory has better recovery behavior than DRAM. Once a DDS including all the DDS partitions 155 is created and stored in the non- volatile memory 150, it is immutable. If one of the compute nodes fails with immutable DDS partitions 155 in the shared non- volatile memory 150, then the DDS partitions 155 do not need to be recomputed using the transformation lineage.
[0016] In response to receiving commands from the compute nodes 110 in the native language of the compute nodes (e.g., Scala / Java), the DDS manager processor 140 provides various stub functions using a wrapper / schema module 160 and a transform module 170. The wrapper / schema module 160 and the transform module 170 may be implemented in software, firmware and/or hardware. The in-situ DDS partitions 155 are stored in shared non- volatile memory 150 following a schema provided by the wrapper / schema module 160. The schema enables representing the DDS partitions using a language independent layout, thus enabling components written in different (managed or/and unmanaged) languages to create, access, and process the in-situ DDS partitions 155 shared between them. This enables an engine like Spark to offload computation from the Scala native language to more efficient languages like C++ for performing critical steps, including, for example, shuffling operations or entire transformations such as filtering, sampling, and indexing implemented entirely in C++.
[0017] The wrapper/schema module 160 and the transform module 170 may use unsafe Java APIs to access the underlying objects stored in the in-situ DDS partitions 155 all at once. These objects and their associated schema may be automatically generated using a stub compiler that takes as input the native language objects of the compute nodes 110 (e.g., Scala objects of Spark nodes). The wrapper objects are then used by the wrapper iterators of the wrapper / schema module 160 to iterate over the elements of the in-situ DDS partitions 155 directly without generating a native language representation of the DDS in the JVM.
[0018] The DDS manager processor 140 manages DDSs stored in shared non-volatile memory 150. An entire in-situ DDS stored in the shared non- volatile memory 150 includes the DDS partitions 155, dependencies on previous in-situ DDSs, and Scala function closures implementing transformations on the dependent DDSs. The Scala function closures may be in the form of stub functions provided by the transform module 170. The stub functions interpret the unserialized bytes of the DDS partitions 155 as Native Java Objects using a schema provided by the wrapper / schema module 160.
[0019] In addition, an in-situ DDS stores a schema, as provided by the wrapper / schema module 160 that defines the fields and layout of the in-situ DDS, and potentially the function closure expressed in different languages such as C++.
[0020] An iterator is an abstraction over a collection of objects that share the same object type and undergo the same transformation. In Spark, each processing step performs a
transformation over an iterator exposed by the underlying RDD. In computing device 100, the compute nodes 110 access the in-situ DDS partitions 155 through wrapper iterators provided by the wrapper / schema module 160. Wrapper iterators provided by the wrapper / schema module 160 may implement RDD APIs and enable seamless integration of in-situ DDS partitions 155 with all of the compute nodes 110. The wrapper iterators rely on the associated DDS schema to access the fields of the underlying in-situ DDS partitions 155 using, in one example, Java Unsafe APIs.
[0021] Compared to regular RDDs stored in DRAM, the in-situ DDSs stored in the nonvolatile memory 150 enable direct in-place data manipulation without first converting them into objects native to the managed language (e.g. Scala, Java). In-place data manipulation has several benefits. First, it can reduce the overhead of serialization/deserialization, and it also reduces the number of objects managed by the JVM runtime, thus reducing the pressure on the JAVA garbage collector. Second, it enables Spark to efficiently handle larger datasets stored in the non- volatile memory 150 rather than datasets that fit in the smaller aggregated DRAM of the Spark nodes.
[0022] Figure 2 illustrates an example flow diagram for an example process 200 for cluster computing with distributed datasets using the computing device 100 of Figure 1. The process 200 is exemplary only and may be modified. The example process 200 of Figure 2 will now be described with further references to Figure 1.
[0023] The process 200 may begin with the DDS manager processor 140 receiving a command to apply a transformation to a partitioned distributed dataset stored in the shared nonvolatile memory 150. Each partition of the distributed dataset is coupled to one of a plurality of the compute nodes 110 coupled to the shared non- volatile memory. The command is in a first language native to the compute nodes 110, block 210. The received command may be received from a programmer terminal communicatively coupled to the computing device 100. The received command may be, in various examples, in Scala language.
[0024] Upon receiving the command at block 210, the DDS manager processor 140 causes the wrapper / schema module 160 to construct a wrapper iterator object within the shared nonvolatile memory 150 using a second language different than the native language and based on a schema stored in association with the distributed dataset, block 220. In various examples, the native language of the compute nodes 110 is based on Java and a JVM. The second language or languages used by the wrapper / schema module may include C++.
[0025] Rather than being described in any of several programming languages such as Java, Scala or C++, the distributed datasets are defined with one of several selected schemas. In various examples, the select schema are described in a domain-specific language that is independent of programming languages such as Java, Scala or C++ and provide basic types such as integers, strings, floating numbers, etc.
[0026] The schema language utilized at block 220 allows definition of complex types comprising an ordered list of the basic types provided by the domain-specific language of the schema. In various examples, the schema may use constructs similar to C structs such as, for example, LIST[T1, T2]. For example, LIST[X: STRING, Y: INT] defines a new complex type as a list of tuples, each of which has two named variables X and Y, where X is defined as a string and Y is defined as an integer.
[0027] For example, the schema language may provide for the definition of two kinds of in- situ DDSs of basic or complex types, as follows:
• VECTOR IN SITU RDD[T], which is a vector of basic type T
• MAP_IN_SITU_RDD[(K,V)], which is an associative container storing key- value pairs
[0028] For example, the following would define an associative in-situ DDS of a string- integer key- value pairs:
MAP_IN_SITU_RDD[(STRING,INT)] (1)
[0029] This language independent declaration of types enables a schema compiler to generate a language independent binary format to represent the DDSs contained within the DDS partitions 155. In various examples, the schema employed at block 220 defines a binary byte sequence format for each basic and complex type and in-situ DDS partitions 155. The schema
may represent basic types using a binary machine representation. For example INT may be a 64- bit (8 byte) sequence. The schema may represent complex types by concatenating the byte sequence of the basic types. For example, a complex type may comprise the following schema:
BinaryFormat(LIST[X: STRING, Y: INT]) = ByteSequence(LIST[X:
STRING, Y: INT]) = ByteSequence(X: STRING) + ByteSequence(Y:INT) = X.LEN + X.CHAR[0] + X.CHAR[ 1 ] + Y (2)
[0030] The complex type defined by the schema equation (2) can be depicted in table form by the following Tables 1 and 2:
Where N = X.LEN
[0031] The binary format of the in-situ DDS comprises all the elements of all the DDS partitions 155, with each partition being constructed by concatenating the byte sequences of each element, with the addition of a byte sequence EOF, or other known byte sequence, at the end. The EOF byte sequence marks the termination of the DDS partition. For example, the binary representation of an associative in-situ DDS partition of string-integer key-value pairs given in equation (1) above (MAP_IN_SITU_RDD[(X: STRING, Y: INT)]) with two elements would be:
Table 7
Byte
M+N+18
EOF
Where N and M are the string lengths of the first and second elements respectively
[0032] Returning to block 220, when the proper schema has been generated, a stub compiler in the wrapper / schema module 160 takes as input the generated schema of the in-situ DDS selected at block 210 and generates a wrapper iterator object in a second language (e.g., C++) that is different that the native language of the compute nodes 110 (e.g., Java or Scala) for iterating through all the elements of the in-situ DDS including all the DDS partitions 155. The wrapper iterator object knows how to interpret the underlying language-independent binary format of the in-situ DDS. A single wrapper iterator object (e.g., Java or Scala) is used to access each element of the entire in-situ DDS including all the DDS partitions 155, rather than having multiple Java or Scala objects each representing one element in Spark. The wrapper iterator constructed at block 220 leverages the property that transformations on the in-situ DDSs stored in the shared non- volatile memory 150 are done on all elements of the in-situ DDS for all the
DDS partitions 155 and that all elements of the in-situ DDS have the same type. For this reason, the wrapper / schema module 160 interprets the schema for the DDS selected at block 210 once and not each time it accesses a DDS element. This in effect reduces the CPU time spent in interpreting types.
[0033] Subsequent to the wrapper iterator object being constructed at block 220, the compute nodes 110 apply the selected transformation received at block 210 to the wrapper iterator object constructed at block 220 to form a resulting distributed dataset in the shared non-volatile memory 150. The resulting distributed dataset is formed and stored directly in the shared nonvolatile memory 150. When the transformation has been applied to all elements of all the DDS partitions 155, the DDS manager processor 140 causes the wrapper / schema iterator module 160 to store the resulting distributed dataset in a format similar to the original DDS using the same schema or a different schema, depending on the transformation.
[0034] The transformation applied at block 230 may be a transformation object in the native language of the compute nodes 110. In various examples, the transformation applied at block 230 may be a transformation object in a language other than the native language of the compute nodes 110, as described above. In these various examples, the DDS manager processor may cause the transform module 170 to construct the selected transformation object in at least one of the second language of the wrapper iterator object or a third language different than the native language. The compute nodes 110 would then apply the constructed transformation object to the wrapper iterator object to form the resulting distributed dataset in the shared non- volatile memory 150.
[0035] The transformation applied at block 230 may include one or more of a filter transformation, a flat map transformation, a map transformation, a reduceByKey transformation, etc. A filter transformation selects elements for which a given function f evaluates to true. For example, a filter (f ^ x=='the') searches for a specified string of characters. A reduceByKey transformation shuffles key-value pairs to reducers where pairs with the same key end up at the same reducer. For example, in reduceByKey ( f ^ x + y) each reducer sums up the values of pairs with same word key. A flatMap transformation splits words and a map transformation maps each word to a key- value pair such as (word, 1).
[0036] For example, applying a filter transformation such as, for example, filter(X= ="the") to the associative in-situ DDS of equation (1) above may be as follows:
MAP_IN_SITU_RDD[(X: STRING,Y: INT)]. filter (X 'the") (3)
[0037] The wrapper iterator object constructed at block 220 interprets the type of each element in the DDS partitions 155 as (STRING, INT) once at the beginning and then applies the transformation filter (X = = "the ") on each element. Since the selected schema defines a binary format, the interpretation of the in-situ DDS partitions 155 essentially determines the beginning and end of each field in the byte sequence. So iteration through the elements of the DDS partitions 155 is essentially iteration through the byte sequence of the DDS and interpretation of each DDS field into a basic or complex type according to the schema until EOF is found. In this case, the elements would be interpreted by a wrapper iterator such as:
INTERPRET RDD ELEMENT(STRING, INT) (4)
[0038] The same wrapper iterator object constructed at block 220 is thus reused for interpreting each element of the DDS partitions 155 rather than reconstructed for each element. The wrapper iterator object, in this example, interprets the elements of the DDS partitions 155 as follows:
XI = INTERPET_STRING(LEN,RDD .B YTE SEQUENCE [0] ,
RDD .BYTE SEQUENCE [0] ... RDD .BYTE SEQUENCE [N]) (5)
Y2 = INTERPRET INTEGER(RDD.BYTE SEQUENCE[N+1] ... (6) RDD .B YTE SEQUENCE [N+8 ] )
XI = INTERPRET STRING(LEN,RDD.BYTE SEQUENCE[N+9],
(?)
RDD . B YTE SEQUENCE [N+ 10]... RDD.BYTE_SEQUENCE[M+N+9] )
Y2 = INTERPRET_INTEGER(RDD .B YTE SEQUENCE [M+N+ 10] ... (8) RDD.BYTE_SEQUENCE[M+N+17])
[0039] Where INTERPRET_STRING(LEN, CHARS) interprets a byte array CHARS of length LEN as a STRING and INTERPRET INTEGER(BYTES) interprets a byte array BYTES as a 64-bit INT.
[0040] Application of the transformation at block 230 proceeds one element at a time. At decision block 240, each of the compute nodes 110 determines if the last element of the DDS partition 155 has been reached. If the last element has not been reached, application of the transformation at block 230 continues. If the last element has been reached, the process 200 continues to block 250 where the compute nodes 110 release the wrapper iterator object. As described above, when the transformation has been applied to all elements of all the DDS partitions 155, the DDS manager processor 140 causes the wrapper / schema iterator module 160 to store the resulting distributed dataset in a format similar to the original DDS using the same schema or a different schema, depending on the transformation. When the resulting distributed dataset has been stored, the process 200 concludes.
[0041] Figure 3 illustrates a block diagram of an example system with a computer-readable storage medium including instructions executable by a processor to forming distributed datasets in a non- volatile virtual memory. The system 300 includes the processor 310 and the computer- readable storage medium 320. The computer-readable storage medium 320 includes example instructions 321-323 executable by the processor 310 to perform various functionalities described herein.
[0042] The example instructions includes receiving command to apply a transformation instructions 321. The instructions 321 cause the processor 310 to receive a command to apply a transformation to a partitioned distributed dataset stored in non-volatile memory. Each partition may be coupled to one of a plurality of compute nodes coupled to the non-volatile memory. The command may be in a first language native to the compute nodes.
[0043] The example instructions 322 cause the processor 310 to construct a wrapper iterator object within the non- volatile memory using a second language different than the native language and based on a schema stored in association with the distributed dataset. The example instructions further include applying the transformation to the wrapper instructions 323. The example instructions 323 cause the processor 310 to apply the transformation to the wrapper iterator object to form a resulting distributed dataset in the non- volatile memory.
[0044] Various examples described herein are described in the general context of method steps or processes, which may be implemented in one example by a software program product or component, embodied in a machine-readable medium, including executable instructions, such as program code, executed by entities in networked environments. Generally, program modules may include routines, programs, objects, components, data structures, etc. which may be designed to perform particular tasks or implement particular abstract data types. Executable instructions, associated data structures, and program modules represent examples of program code for executing steps of the methods disclosed herein. The particular sequence of such executable instructions or associated data structures represents examples of corresponding acts for implementing the functions described in such steps or processes.
[0045] Software implementations of various examples can be accomplished with standard programming techniques with rule-based logic and other logic to accomplish various database searching steps or processes, correlation steps or processes, comparison steps or processes and decision steps or processes.
[0046] The foregoing description of various examples has been presented for purposes of illustration and description. The foregoing description is not intended to be exhaustive or limiting to the examples disclosed, and modifications and variations are possible in light of the above teachings or may be acquired from practice of various examples. The examples discussed herein were chosen and described in order to explain the principles and the nature of various examples of the present disclosure and its practical application to enable one skilled in the art to utilize the present disclosure in various examples and with various modifications as are suited to the particular use contemplated. The features of the examples described herein may be combined in all possible combinations of methods, apparatus, modules, systems, and computer program products.
[0047] It is also noted herein that while the above describes examples, these descriptions should not be viewed in a limiting sense. Rather, there are several variations and modifications which may be made without departing from the scope as defined in the appended claims.
Claims
1. A device, comprising:
a plurality of compute nodes;
a non-volatile memory coupled to the plurality of compute nodes and storing a partitioned distributed dataset, each partition coupled to one the plurality of compute nodes; a processor to construct a wrapper iterator object within the non- volatile memory based on a schema stored in association with the distributed dataset,
wherein the compute nodes apply a transformation to the wrapper iterator object to form a resulting distributed dataset in the non-volatile memory.
2. The device of claim 1, wherein the processor is to receive a command to apply the
transformation to the partitioned dataset in a first language native to the compute nodes, and wherein the wrapper iterator object is constructed using a second language different from the first language.
3. The device of claim 2, wherein the transformation in the received command requires a shuffle operation when performed in the native language of the compute nodes and the constructed transformation requires no data serialization.
4. The device of claim 1, wherein the processor is further to store the resulting distributed dataset in a format based on at least one of the schema stored in association with the distributed dataset or a different schema.
5. The device of claim 1, wherein the schema comprises a language-independent binary byte sequence format and the wrapper iterator object is constructed to interpret the language- independent binary byte sequence format of the schema.
6. A method, comprising:
receiving a command to apply a transformation to a partitioned distributed dataset stored in non-volatile memory, each partition coupled to one a plurality of compute nodes coupled to
the non-volatile memory, the command being in a first language native to the compute nodes; constructing a wrapper iterator object within the non- volatile memory using a second language different than the native language and based on a schema stored in association with the distributed dataset; and
applying the transformation to the wrapper iterator object to form a resulting distributed dataset in the non-volatile memory.
7. The method of claim 6, wherein applying the transformation comprises constructing the transformation in at least one of the second language or a third language different than the native language and applying the constructed transformation to the wrapper iterator object to form the resulting distributed dataset in the non-volatile memory.
8. The method of claim 7 wherein the transformation in the received command requires a shuffle operation when performed in the native language of the compute nodes and the constructed transformation requires no data serialization.
9. The method of claim 6, further comprising storing the resulting distributed dataset in a format based on at least one of the schema stored in association with the distributed dataset or a different schema.
10. The method of claim 6, wherein the schema comprises a language-independent binary byte sequence format and the wrapper iterator object is constructed to interpret the language- independent binary byte sequence format of the schema.
11. A computer program product, embodied on a non-transitory computer-readable medium, comprising:
computer code for receiving a command to apply a transformation to a partitioned distributed dataset stored in non-volatile memory, each partition coupled to one of a plurality of compute nodes coupled to the non-volatile memory, the command being in a first language native to the compute nodes;
computer code for constructing a wrapper iterator object within the non- volatile memory
using a second language different than the native language and based on a schema stored in association with the distributed dataset; and
computer code for applying the transformation to the wrapper iterator object to form a resulting distributed dataset in the non-volatile memory.
12. The computer program product of claim 11, wherein the computer code for constructing the transformation comprises computer code for constructing the transformation in at least one of the second language or a third language different than the native language and the computer code for applying the transformation comprises computer code for applying the constructed
transformation to the wrapper iterator object to form the resulting distributed dataset in the nonvolatile memory
13. The computer program product of claim 12, wherein the transformation in the received command would require a shuffle operation when performed in the native language of the compute nodes and the constructed transformation requires no data serialization.
14. The computer program product of claim 11 further comprising computer code for storing the resulting distributed dataset in a format based on at least one of the schema stored in association with the distributed dataset or a different schema.
15. The computer program product of claim 11, wherein the schema comprises a language- independent binary byte sequence format and the wrapper iterator object is constructed to interpret the language-independent binary byte sequence format of the schema.
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| PCT/US2015/042134 WO2017019001A1 (en) | 2015-07-24 | 2015-07-24 | Distributed datasets in shared non-volatile memory |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| PCT/US2015/042134 WO2017019001A1 (en) | 2015-07-24 | 2015-07-24 | Distributed datasets in shared non-volatile memory |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| WO2017019001A1 true WO2017019001A1 (en) | 2017-02-02 |
Family
ID=57884896
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| PCT/US2015/042134 Ceased WO2017019001A1 (en) | 2015-07-24 | 2015-07-24 | Distributed datasets in shared non-volatile memory |
Country Status (1)
| Country | Link |
|---|---|
| WO (1) | WO2017019001A1 (en) |
Cited By (1)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US10467113B2 (en) | 2017-06-09 | 2019-11-05 | Hewlett Packard Enterprise Development Lp | Executing programs through a shared NVM pool |
Citations (5)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| WO2002035395A2 (en) * | 2000-10-27 | 2002-05-02 | Entigen Corporation | Integrating heterogeneous data and tools |
| US20100161678A1 (en) * | 2008-12-23 | 2010-06-24 | Honeywell International Inc. | Method and apparatus for utilizing matlab functionality in java-enabled environment |
| US20110289490A1 (en) * | 2010-05-20 | 2011-11-24 | Mcatamney James A | C-to-java programming language translator |
| US20120011500A1 (en) * | 2010-07-09 | 2012-01-12 | Paolo Faraboschi | Managing a memory segment using a memory virtual appliance |
| US20140055496A1 (en) * | 2012-08-24 | 2014-02-27 | International Business Machines Corporation | Transparent efficiency for in-memory execution of map reduce job sequences |
-
2015
- 2015-07-24 WO PCT/US2015/042134 patent/WO2017019001A1/en not_active Ceased
Patent Citations (5)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| WO2002035395A2 (en) * | 2000-10-27 | 2002-05-02 | Entigen Corporation | Integrating heterogeneous data and tools |
| US20100161678A1 (en) * | 2008-12-23 | 2010-06-24 | Honeywell International Inc. | Method and apparatus for utilizing matlab functionality in java-enabled environment |
| US20110289490A1 (en) * | 2010-05-20 | 2011-11-24 | Mcatamney James A | C-to-java programming language translator |
| US20120011500A1 (en) * | 2010-07-09 | 2012-01-12 | Paolo Faraboschi | Managing a memory segment using a memory virtual appliance |
| US20140055496A1 (en) * | 2012-08-24 | 2014-02-27 | International Business Machines Corporation | Transparent efficiency for in-memory execution of map reduce job sequences |
Cited By (1)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US10467113B2 (en) | 2017-06-09 | 2019-11-05 | Hewlett Packard Enterprise Development Lp | Executing programs through a shared NVM pool |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| Xin et al. | Shark: SQL and rich analytics at scale | |
| Besta et al. | Slimsell: A vectorizable graph representation for breadth-first search | |
| Alexandrov et al. | The stratosphere platform for big data analytics | |
| Crotty et al. | Tupleware:" Big" Data, Big Analytics, Small Clusters. | |
| US11630864B2 (en) | Vectorized queues for shortest-path graph searches | |
| US20140372374A1 (en) | Difference determination in a database environment | |
| CN104364775A (en) | Special memory access path with segment-offset addressing | |
| CN109690522B (en) | Data updating method and device based on B+ tree index and storage device | |
| CN105930479A (en) | Data skew processing method and apparatus | |
| US9015133B2 (en) | Optimized resizing for RCU-protected hash tables | |
| Breitenfeld et al. | DAOS for extreme-scale systems in scientific applications | |
| Smith | Performance of MPI Codes Written in Python with NumPy and mpi4py | |
| JP6318303B2 (en) | Parallel merge sort | |
| Shi et al. | DFPS: Distributed FP-growth algorithm based on Spark | |
| Eldawy et al. | Sphinx: Distributed execution of interactive sql queries on big spatial data | |
| CN116431660B (en) | Data processing method, device, system, electronic device and storage medium | |
| US11222070B2 (en) | Vectorized hash tables | |
| US12461912B2 (en) | Method and apparatus for mapping Java Streams to JPA commands | |
| WO2017019001A1 (en) | Distributed datasets in shared non-volatile memory | |
| Gittens et al. | A multi-platform evaluation of the randomized CX low-rank matrix factorization in Spark | |
| Shangguan et al. | SPARK processing of computing-intensive classification of remote sensing images: The case on K-means clustering algorithm | |
| Wang et al. | Parallel construction of knowledge graphs from relational databases | |
| CN115982255A (en) | Method and device for improving ETL (extract transform load) performance of data | |
| Ma et al. | Developing a Multi‐GPU‐Enabled Preconditioned GMRES with Inexact Triangular Solves for Block Sparse Matrices | |
| US11194625B2 (en) | Systems and methods for accelerating data operations by utilizing native memory management |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| 121 | Ep: the epo has been informed by wipo that ep was designated in this application |
Ref document number: 15899798 Country of ref document: EP Kind code of ref document: A1 |
|
| NENP | Non-entry into the national phase |
Ref country code: DE |
|
| 122 | Ep: pct application non-entry in european phase |
Ref document number: 15899798 Country of ref document: EP Kind code of ref document: A1 |