[go: up one dir, main page]

WO2022164419A1 - Distributed semantic-aware cache - Google Patents

Distributed semantic-aware cache Download PDF

Info

Publication number
WO2022164419A1
WO2022164419A1 PCT/US2021/015022 US2021015022W WO2022164419A1 WO 2022164419 A1 WO2022164419 A1 WO 2022164419A1 US 2021015022 W US2021015022 W US 2021015022W WO 2022164419 A1 WO2022164419 A1 WO 2022164419A1
Authority
WO
WIPO (PCT)
Prior art keywords
dag
plan
semantic
data
cache
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Ceased
Application number
PCT/US2021/015022
Other languages
French (fr)
Inventor
Theodoros GKOUNTOUVAS
Hui Lei
Hongliang Tang
Zhihao Tang
Yong Wang
Ning Wu
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
FutureWei Technologies Inc
Original Assignee
FutureWei Technologies Inc
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by FutureWei Technologies Inc filed Critical FutureWei Technologies Inc
Priority to CN202180091941.9A priority Critical patent/CN116802618A/en
Priority to PCT/US2021/015022 priority patent/WO2022164419A1/en
Publication of WO2022164419A1 publication Critical patent/WO2022164419A1/en
Anticipated expiration legal-status Critical
Ceased legal-status Critical Current

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F3/00Input arrangements for transferring data to be processed into a form capable of being handled by the computer; Output arrangements for transferring data from processing unit to output unit, e.g. interface arrangements
    • G06F3/06Digital input from, or digital output to, record carriers, e.g. RAID, emulated record carriers or networked record carriers
    • G06F3/0601Interfaces specially adapted for storage systems
    • G06F3/0602Interfaces specially adapted for storage systems specifically adapted to achieve a particular effect
    • G06F3/061Improving I/O performance
    • G06F3/0613Improving I/O performance in relation to throughput
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F3/00Input arrangements for transferring data to be processed into a form capable of being handled by the computer; Output arrangements for transferring data from processing unit to output unit, e.g. interface arrangements
    • G06F3/06Digital input from, or digital output to, record carriers, e.g. RAID, emulated record carriers or networked record carriers
    • G06F3/0601Interfaces specially adapted for storage systems
    • G06F3/0628Interfaces specially adapted for storage systems making use of a particular technique
    • G06F3/0655Vertical data movement, i.e. input-output transfer; data movement between one or more hosts and one or more storage devices
    • G06F3/0659Command handling arrangements, e.g. command buffers, queues, command scheduling
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F3/00Input arrangements for transferring data to be processed into a form capable of being handled by the computer; Output arrangements for transferring data from processing unit to output unit, e.g. interface arrangements
    • G06F3/06Digital input from, or digital output to, record carriers, e.g. RAID, emulated record carriers or networked record carriers
    • G06F3/0601Interfaces specially adapted for storage systems
    • G06F3/0668Interfaces specially adapted for storage systems adopting a particular infrastructure
    • G06F3/0671In-line storage system
    • G06F3/0683Plurality of storage devices
    • G06F3/0689Disk arrays, e.g. RAID, JBOD

Definitions

  • This application is related to large-scale data processing systems and, in particular, to a distributed semantic-aware cache that replaces traditional cache store operations with corresponding semantic-aware commands that enable cache content to be utilized by multiple users in a distributed data processing system.
  • Distributed compute engines typically accept jobs from users in the form of Distributed Acyclic Graphs (DAGs).
  • a sample distributed compute engine 100 is illustrated in FIG. 1.
  • a driver module 110 is responsible for dividing the jobs into multiple smaller tasks and assigning the tasks to worker modules 120 for execution.
  • the Distributed Compute Engine operates at (1) by a user 130 sending a job request to the driver module 110 in the form of a DAG.
  • the driver module 110 creates an optimized execution DAG from the input DAG and divides it into multiple tasks where each task can be executed in a single worker module 120. The tasks are sent to the worker modules 120 for execution in a specific order.
  • the worker modules 120 load the initial data set from storage 140 in response to a load command, process the initial data set (e.g., use a filter to extract records, count records, etc.), and map and store the output back to storage 140, if necessary.
  • the worker modules 120 send an acknowledgement or a result related to the execution of the corresponding task back to the driver module 110.
  • the driver module 110 sends the acknowledgement or the result related to the whole execution of the job back to the user 130 when all tasks are finished.
  • this job loads data at 150 from a group of comma separated values (CSV) files. Then, this job filters out all records at 160 that do not have a hot temperature (>100 F). Only records in areas with a warm climate during warm periods would affect the final count taken at 170.
  • a map transformation may be applied at 180 to facilitate storage of the data at 190.
  • Distributed compute engines 100 for large data workloads adopt a naive approach for tackling workloads like the one mentioned above.
  • the user 130 provides a job in the form of a DAG (like the simple example above, load -’-filter-’--- •)•
  • the distributed compute engine 100 creates an optimized execution plan from the user-provided DAG based on the available resources and workload characteristics.
  • a DAG execution plan (“DAG plan”) like the one shown in FIG. 2 is constructed where it is assumed that the distributed compute engine 100 divides the dataset into four different partitions 200 (Pl, P2, P3, P4).
  • the DAG plan explains how the DAG is to be executed by the distributed compute engine.
  • a portion of the data inside the partitions 200 has records with hot temperatures 210, and the rest of the data belongs to distinct different temperature ranges (warm 220, cool 230, and cold 240).
  • a process loads the corresponding data, filters in only the hot temperature records 210 at 250, and continues the ensuing operations, including performing map transformations 260 that facilitate data storage.
  • map transformations 260 that facilitate data storage.
  • HDDs hard disk drives
  • SSDs solid state drives
  • Usage of computation and memory resources (e.g., DRAM) for the compute engine cluster is analogous to the size of the initial dataset;
  • Prior art cache techniques do not incorporate processing efficiencies such as semantic awareness, adaptive partitioning, and data skipping metadata.
  • the distributed semantic-aware cache design described herein includes such processing efficiencies and may further be: Shared among users (i.e., content is accessible by many users); Distributed (utilizes cache space from multiple servers); Multi-Tiered (uses multiple tiers (e.g., memory and disk) to store data ); Semantic- Aware (includes metadata about the content and automatically uses it to improve performance); Multi-Modal (provides interfaces for all three types of interactions, repartitioning, data-skipping metadata, and intermediate data caching); and Platform- Agnostic (does not depend on a specific platform).
  • the distributed semantic-aware cache works on top of baseline distributed multi-tiered shared caches with simple load/store application programming interfaces (APIs).
  • the distributed semantic-aware cache exposes a new semantic-aware API that allows the distributed semantic-aware cache to automatically use the content to optimize incoming jobs.
  • a translator module ensures that application users do not have to orchestrate complex and sophisticated computation operations for relatively simple to understand functions (like repartitioning data, creating data skipping metadata, and caching intermediate results) on their own.
  • the distributed semantic-aware cache also is able to efficiently share content among users since it maintains concrete knowledge of the cached content and can decide how and when to reuse it, if necessary.
  • an optimizer module ensures that the plan is fully optimized to take advantage of the content of the distributed semantic-aware cache.
  • a method of performing a data analytics process on a distributed data analytics platform that shares cached data amongst users.
  • the method includes receiving a semantic-aware store command from a user, translating a directed acyclic graph (DAG) plan for executing the semantic-aware store command from the user into a translated DAG plan to be executed by a distributed shared cache, and executing the translated DAG plan including at least a final store command and storing results of execution of the translated DAG plan to the distributed shared cache.
  • DAG directed acyclic graph
  • a distributed semantic-aware cache comprising a distributed shared cache that stores data that may be shared by at least two users, and a semantic-aware cache manager.
  • the semantic aware cache manager comprises a translator module that translates a directed acyclic graph (DAG) plan for executing a semantic-aware store command from a user into a translated DAG plan to be executed by the distributed shared cache.
  • the distributed shared cache receives and stores results of execution of the translated DAG plan and a final store command.
  • DAG directed acyclic graph
  • a distributed semantic-aware cache system comprising a distributed shared cache that stores data that may be shared by at least two users, a semantic-aware cache manager, and a distributed compute engine.
  • the semantic aware cache manager comprises a translator module that translates a directed acyclic graph (DAG) plan for executing a semantic-aware store command from a user into a translated DAG plan to be executed by the distributed shared cache.
  • the distributed compute engine executes the translated DAG plan and stores results of execution of the translated DAG plan and a final store command in the distributed shared cache.
  • DAG directed acyclic graph
  • the DAG plan is transformed into a semantic cache format prior to translating of the DAG plan, and the translated DAG plan is transformed into a format of a distributed compute engine that executes the translated DAG plan.
  • the semantic-aware store commands from the distributed shared cache are based on knowledge about data stored in the distributed shared cache, and execution of the translated DAG plan is optimized using the knowledge about data stored in the distributed shared cache.
  • a load command is executed that optimizes the DAG plan from the user by using data stored in the distributed shared cache to update the translated DAG plan with load commands from the distributed shared cache to form an optimized DAG plan.
  • a semantic- aware cache load command including the DAG plan is sent to the distributed shared cache whenever a semantic-aware store command is executed.
  • the optimized DAG plan is returned for execution, and the optimized DAG plan including the load commands is executed by a distributed compute engine. Results of execution of the optimized DAG plan are stored to the distributed shared cache.
  • the optimized DAG plan is transformed into a semantic cache format prior to translating, and the optimized DAG plan is transformed into a format of a distributed compute engine that executes the optimized DAG plan.
  • the translated DAG plan is updated by reviewing cached intermediate data to determine at least one of (1) whether there is a branch of the DAG plan that can be pruned by loading data directly from the distributed shared cache, (2) whether there is a repartition of the data in the distributed shared cache that can eliminate partitions or shuffle phases, or (3) whether, based on data-skipping metadata, there is an ensuing operation that can eliminate partitions.
  • the distributed shared cache comprises multiple tiers, and an evict command is received from the user specifying an output path and tier of the distributed shared cache used to store corresponding content.
  • a deletion command corresponding to the evict command is sent to the distributed shared cache, and semantic information in the distributed shared cache about the corresponding content is destroyed.
  • the DAG plan is translated by adhering to semantics of the semantic-aware store command when transforming the semantic-aware store command into the translated DAG plan.
  • the received semantic-aware store command comprises at least one of an adaptive partitioning command in which data is dynamically structured based on workload characteristics, a data skipping metadata command m which a data partition is pruned when eliminating the data partition does not affect the results of execution of the translated DAG plan, or an intermediate data command in which data is reused for another operation.
  • the distributed shared cache comprises multiple tiers, and when the received semantic- aware store command comprises the adaptive partitioning command, Error! Reference source not found.
  • the distributed shared cache comprises multiple tiers
  • the received semantic- aware store command comprises the data skipping metadata command
  • the translated DAG plan is constructed that, when executed, takes input data, creates relevant metadata for an attribute specified in the data skipping metadata command, and stores the relevant metadata in a location and memory tier specified in the data skipping metadata command.
  • the distributed shared cache comprises multiple tiers, and when the received semantic- aware store command comprises the intermediate data command and the DAG plan from the user, the DAG plan from the user is executed and results are again stored to a specified location and memory tier of the distributed shared cache.
  • the method may be performed and the instructions on the computer- readable media may be processed by the apparatus, and further features of the method and instructions on the computer-readable media result from the functionality of the apparatus. Also, the explanations provided for each aspect and its implementation apply equally to the other aspects and the corresponding implementations. The different embodiments may be implemented in hardware, software, or any combination thereof. Also, any one of the foregoing examples may be combined with any one or more of the other foregoing examples to create a new embodiment within the scope of the present disclosure.
  • FIG. 1 illustrates a sample distributed compute engine.
  • FIG. 2 illustrates a sample execution plan for the distributed compute engine of FIG. 1 where it is assumed that the distributed compute engine divides the dataset into four different partitions.
  • FIG. 3 illustrates an example of adaptive partitioning.
  • FIG. 4 illustrates an example of data-skipping.
  • FIG. 5 illustrates an example of intermediate data caching.
  • FIG. 6 illustrates a Spark distributed multi-tiered cache.
  • FIG. 7 illustrates an Alluxio distributed multi-tiered shared cache.
  • FIG. 8 illustrates a distributed semantic-aware cache in a sample embodiment.
  • FIG. 9 is a flow chart illustrating the store protocol of the distributed semantic-aware cache of FIG. 8.
  • FIG. 10 illustrates loading data from the distributed semantic-aware cache of FIG. 8 for use by a second user in a sample embodiment.
  • FIG. 11 is a flow chart illustrating the load protocol of the distributed semantic-aware cache of FIG. 8.
  • FIG. 12 illustrates a sample implementation of a translator module in a sample embodiment.
  • FIG. 13 illustrates a sample implementation of an optimizer module in a sample embodiment.
  • FIG. 14 is a block diagram illustrating circuitry for performing the methods according to sample embodiments.
  • the functions or algorithms described herein may be implemented in software in one embodiment.
  • the software may include computer-executable instructions stored on computer-readable media or computer-readable storage device such as one or more non-transitory memories or other type of hardwarebased storage devices, either local or networked.
  • modules which may be software, hardware, firmware or any combination thereof. Multiple functions may be performed in one or more modules as desired, and the embodiments described are merely examples.
  • the software may be executed on a digital signal processor, application specific integrated circuit (ASIC), microprocessor, or other type of processor operating on a computer system, such as a personal computer, server or other computer system, turning such computer system into a specifically programmed machine.
  • ASIC application specific integrated circuit
  • DAG Directed Acyclic Graph
  • a cache is a memory device that stores data for faster access (e.g., an on-chip CPU cache that reduces main memory accesses, a memory chip in hard drive that caches results to reduce disk accesses, etc.).
  • a multi-tiered cache may include different types of data storages devices such as a dynamic random access memory (DRAM), hard disk drive (HDD), and solid state drive (SSD), including multiple physical memories of different sizes and speeds to reduce slower data accesses.
  • DRAM dynamic random access memory
  • HDD hard disk drive
  • SSD solid state drive
  • a cache can be used to store this extra data since it does not need to be replicated or persisted for fault -tolerance. If the contents are not found in the cache, the compute engines can revert to the baseline strategy, which is inefficient but correct.
  • Adaptive partitioning is the process in which data is dynamically restructured based on the workload characteristics. For example, if there are a lot of jobs that have a filter operation involving the temperature attribute value (like the one in the example of FIGS. 1-2), it might be beneficial to sort and split the data between partitions based on the temperature attribute value.
  • FIG. 3 shows resorting of the initial partitions 200 (Pl, P2, P3, P4) into new partitions 310 (Pl ), 320 (P2 ), 330 (P3 ), and 340 (P4 ) that correspond to distinct temperature ranges.
  • These new partitions 310-340 can be stored using extra memory or storage space depending on the size and the hardware.
  • the compute engine loads only the Pl' partition 310, which is the only one that contains records 210 with hot temperature values in a process known as partition-pruning.
  • the filter operation 350 and map operation 360 are still executed on the compute side since for the Pl' partition 310 in the loaded partitions there can still be records that are eventually filtered out. Processing is saved since filter operations 370 and map operations 380 do not need to be performed for partitions 320, 330, or 340. As shown in FIG.
  • Another technique used to improve cache efficiency is data-skipping.
  • the storage side (or the compute side) maintains information about secondary attribute values (not main partition attributes) per partition.
  • secondary attribute values not main partition attributes
  • Common data structures used for such secondary attribute values include:
  • a partition can be pruned if the data-skipping information ensures that eliminating the partition does not lead to a different result.
  • the filter operations 450 and map operations 460 are still executed for the Pl partition 410 and the P2 partition 420 on the compute side. Processing is saved since filter operations 470 and map operations 480 do not need to be performed for the pruned partitions 430 and 440.
  • Yet another technique used to improve cache efficiency is intermediate data caching where the compute engine stores the result of an operation. Then, if the same or a similar job that includes many of the same functions is executed, the stored results can be reused.
  • the first job with the specified filter operation 510 filter-in hot temperature records
  • the respective results 520 of the filter operation 510 are stored in memory or storage 530.
  • the rest of the job is then executed, including mapping operations 540.
  • the job can skip the filter operation 510 and perform the initial data loading from storage 530.
  • the cached results 520 may be directly loaded without executing the filter operation 510, thus saving the processing of the filter operation 510 for repeated filter operations.
  • Spark cache offers another cache solution including a distributed multitiered cache, but it has a few limitations.
  • FIG. 6 summarizes the Spark cache technique.
  • the distributed multi-tiered cache 610 is not shared between users 620 and 630. Instead, the distributed compute engine 640 performs a filter operation 650 for user 620 and stores the filter operation 650 and filter results 660 (e. g. , key value and contents) in the distributed multi-tiered cache 610, while the distributed compute engine 670 performs a filter operation 680 for user 630 and either loads the filter results at 682 into the distributed multi-tiered cache 690 or executes a mapping operation 684.
  • users 620 and 630 cannot benefit from each other’s cache contents, and each user process must be rerun from the beginning.
  • Alluxio cache offers yet another cache solution including a distributed multi-tiered shared cache.
  • the distributed multi-tiered shared cache itself does not provide semantic information about the content, so the content cannot be utilized unless the users have knowledge about its origins (initially created by them) and share such information. For example, one user would send a file location and provide information about the file contents to a second user through an external channel. Error! Reference source not found, summarizes the Alluxio cache technique. As illustrated in FIG. 7, the distributed compute engine 710 performs a filter operation 720 for user 730 and stores the filtered data and a pointer to the filtered data 740 in a distributed multi-tiered shared cache 750.
  • the distributed compute engine 760 performs a filter operation 770 for user 780 and either loads the filter results at 772 into the distributed multi-tiered shared cache 750 or executes a mapping operation 774.
  • both users 730 and 780 may access data stored in the distributed multi-tiered shared cache 750, the pointer to the filtered data 740 must be shared manually by the users 730 and 780 in order for the users 730 and 780 to benefit from each other’s cache contents.
  • the distributed semantic-aware cache described herein incorporates features from the above-mentioned cache techniques while also providing a different application programming interface (API) compared to the standard load/store API.
  • the distributed semantic- aware cache includes a semantic-aware API that exposes the cache contents to respective users whereby the respective users may automatically use knowledge about the cached content to optimize incoming jobs.
  • a “semantic cache” thus includes a data interface layer above the multi-tiered cache that allows for improved query execution using metadata.
  • the distributed semantic-aware cache provides information to the user about the cache contents via the API so that the content may be efficiently searched and/or reproduced, as necessary.
  • the semantic-aware API exposes operations including: store -> (repartition!
  • the semantic-aware API thus replaces traditional cache store operations with corresponding semantic-aware commands that convey semantic information and implement techniques including at least one of the three techniques described above: adaptive partitioning, data skipping metadata, and intermediate data.
  • the application user 810 sends a store command via the distributed compute engine 820 to a stateless semantic cache client plugin module 830 that transforms a DAG plan from a format of the native distributed compute engine 820 to a semantic cache format of distributed semantic-aware cache 840 (e.g., in case an intermediateData command is used by the distributed semantic-aware cache 840).
  • the semantic cache client plugin module 830 transforms a DAG plan of the native distributed compute engine 820 (e.g., Spark) that describes particular jobs into the DAG plan of the distributed semantic aware cache 840, which may have a different data structure (“DAG”) for describing jobs.
  • the distributed semantic-aware cache 840 includes a semantic cache manager 850 that is registered with the semantic cache client plugin module 830 and includes a translator module 852 and an optimizer module 854 that is responsible for transforming the DAG plan to benefit from the content of the distributed semantic-aware cache 840 (see description of FIG. 13 below).
  • the semantic cache manager 850 produces a DAG plan that the distributed compute engine 820 can execute.
  • the translator module 852 is responsible for transforming the store command from the user 810 into the DAG plan (e.g., repartition) that the distributed compute engine 820 can execute.
  • the distributed compute engine 820 executes the resulting DAG and stores the results to a baseline distributed multi-tiered shared cache 842.
  • the optimizer module 854 may transform the original DAG into an optimized DAG that exploits potential helpful content from the distributed semantic-aware cache 840 that works on top of the distributed multi-tiered shared cache 842 to improve the loading process to distributed compute engine 860 of another user 870 via another semantic cache client plugin module 880 that transforms the optimized DAG plan from a format of the distributed semantic-aware cache 840 into a format of the native distributed compute engine 860. More details about the loading process will be described below with respect to FIG. 10 and FIG. 11.
  • FIG. 9 is a flow chart including steps 910, 920, 930, 940, and 950 illustrating the store protocol 900 of the distributed semantic-aware cache of FIG.
  • the distributed compute engine 820 propagates the user-issued store command from the user 810 to the semantic cache client plugin module 830.
  • the semantic cache client plugin module 830 transforms the DAG plan from a native distributed compute engine format into a semantic cache format. Then, the command is propagated to the translator module 852 of the semantic cache manager 850.
  • the semantic cache translator module 852 translates the store request into a DAG plan (“translated DAG plan”) explaining how the store request is to be executed by the distributed compute engine 820.
  • the DAG plan includes store commands from the distributed multi-tiered shared cache 842 that are sent back to the semantic cache client plugin module 830. Simple examples are shown in FIG. 12.
  • the semantic cache client plugin module 830 performs the reverse transformation (from step 920) of the DAG plan from the format of the semantic cache manager 850 into the format of the native distributed compute engine 820.
  • the distributed compute engine 820 executes the DAG plan along with a final store command from the distributed semantic-aware cache 840 and stores the results in the distributed multi-tiered shared cache 842.
  • the distributed semantic-aware cache 840 may also replace traditional load semantics with an optimization command from the API of the distributed semantic-aware cache 840.
  • the load command (as opposed to the store commands) should not be triggered by the application user, but the load should be perceived as an optimization of the original DAG plan provided by the user 810.
  • FIG. 11 is a flow chart including steps 1110, 1120, 1130, 1140, and 1150 illustrating the load protocol 1100 of the distributed semantic-aware cache of FIG. 8.
  • FIG. 10 illustrates loading data from the distributed semantic-aware cache 840 for use by a second user 870 in a sample embodiment.
  • the distributed compute engine 860 of the second user 870 attempts to load data from the distributed semantic- aware cache 840
  • the initial DAG provided by user 810 may be optimized (e.g., use the same filter operation to eliminate the filter operation and initial data load by loading from cache) and used by the second user 870 to generate a comprehensive execution plan.
  • the distributed compute engine 860 asks the distributed semantic-aware cache 840 through the corresponding semantic cache client plugin module 880 if the DAG can be further optimized using the cache content.
  • the optimizer module 854 transforms the original DAG into an optimized DAG that exploits potential helpful content from the distributed semantic-aware cache 840.
  • Potential load commands from the distributed multi-tiered shared cache 842 are included and executed by the distributed compute engine 860 when the distributed compute engine 860 executes the optimized DAG plan (after it applies other potential optimizations on its own) by potentially loading useful data from the baseline distributed multi-tiered shared cache 842.
  • the initial DAG plan may be used instead of creating an optimized DAG plan.
  • FIG. 11 is a flow chart including steps 1110, 1120, 1130, 1140, and 1150 illustrating the load protocol 1100 of the distributed semantic-aware cache of FIG. 8.
  • the distributed compute engine 860 sends a semantic cache load-command (DAG) to the semantic cache client plugin module 880 whenever a new job is executed.
  • DAG semantic cache load-command
  • the semantic cache client plugin module 880 transforms the DAG plan from the format of the native distributed compute engine 860 into the format of the distributed semantic-aware cache 840. Then, the load command is propagated to the optimizer module 854 of the distributed semantic-aware cache 840.
  • the optimizer module 854 of the distributed semantic-aware cache 840 optimizes the DAG plan received from the distributed compute engine 860 by transforming it and returning the optimized DAG plan to the semantic cache client plugin module 880 for execution.
  • the optimized DAG plan may be modified to contain original load commands from the distributed multi-tiered shared cache 842.
  • the semantic cache client plugin module 880 does the reverse transformation (from step 1120) of the optimized DAG plan from the format of the semantic cache manager 850 into the format of the distributed compute engine 860.
  • the distributed compute engine 860 executes the optimized DAG plan along with potential load commands from the distributed semantic-aware cache 840 and loads the results into the distributed multi-tiered shared cache 842.
  • the evict command is very simple and is similar to the store commands. For the evict command, an application user specifies the output path and the memory tier used to store the corresponding content.
  • the distributed compute engine 820 sends this information to the semantic cache manager 850 via the semantic cache client plugin module 830.
  • the semantic cache manager 850 sends a corresponding deletion command to the distributed multi-tiered shared cache 842 and destroys the internally maintained semantic information about the content.
  • the translator module 852 accepts incoming application store commands via the distributed compute engine 820 and transforms the commands into a DAG plan that adheres to the command s semantics and that can be executed m the distributed compute engine. Error! Reference source not found, specifying the input DAG, the output path, and the memory tier, the translator module 852 instructs the distributed compute engine 820 to run the input DAG plan and then store its results again to the specified location and tier of the distributed multi-tiered shared cache 842 at 1280.
  • the cached data may be used while skipping the initial loading of data.
  • the optimizer module 854 is responsible for transforming the DAG plan to benefit from the content of the distributed semantic- aware cache 840.
  • a sample implementation of the optimizer module 854 is shown in Error! Reference source not found. IG. 13.
  • a DAG may be represented as a collection of interleaving (or noninterleaving) directed trees where the root is a node with no incoming edges. In a DAG, it is guaranteed that there is at least one node with no incoming edge. Thus, each tree may be split and analyzed separately (as a different sub-job). In the example of FIG. 13, the map is the root node and the load is the leaf node for optimization purposes. In FIG.
  • the arrows have opposite direction to determine the order of execution
  • the optimizer module 854 looks at the current node in the tree at 1304 and 1306 to see if the current node is a shuffle node participating in a shuffle operation or a filter node participating in a filter operation.
  • some operations like map, filter, and many more cause narrow dependencies between tasks (one-to-one communication patterns).
  • these operations are bundled together and executed by the same process.
  • this exchange of data is denoted as a Shuffle Phase that is implemented at one or more shuffle nodes.
  • the optimizer module 854 further checks if the current node can be replaced with intermediate data content at 1312 from the baseline distributed multitiered shared cache 842. A recursive call is executed at 1314, 1316, and 1318 to optimize the DAG for each one of the children of the current node.
  • the optimizer module 854 replaces the current node with a load from the baseline distributed multi-tiered shared cache 842 at 1324.
  • the optimizer module 854 looks for repartitions and data-skipping metadata at 1326.
  • the optimizer module 854 replaces the leaf node (datasource) with the new repartition from the baseline distributed multi-tiered shared cache 842 at 1332.
  • the optimizer module 854 replaces the leaf node (datasource) with a new load that skips partition reading according to the data-skipping metadata from the baseline distributed multi-tiered shared cache 842 at 1334.
  • the optimizer module 854 returns the modified optimized tree at 1336.
  • a customized eviction module can be used to enable a user to orchestrate a delete operation.
  • the user may decide what data to automatically delete (evict) when the cache is full so that users may directly delete data from the distributed semantic-aware cache 840.
  • an automatic procedure may be provided that relieves users from managing this aspect of caching, but it might make sub-optimal decisions as a result.
  • sample embodiments of the distributed semantic-aware cache 840 include multiple memory tiers.
  • the distributed semantic- aware cache 840 may be a two-tiered cache including memory (e.g., HDD) and disk (e.g. SSD).
  • the distributed semantic-aware cache 840 also can be more sophisticated and complex.
  • the distributed semantic-aware cache 840 may include memory in a compute side, disk in a compute side, memory in a storage side, and disk in a storage side that are selectable by the user. This allows users to have more flexibility to fine-tune their decisions, but it makes the orchestration of the data more complex and difficult.
  • the distributed semantic- aware cache described herein has numerous benefits over conventional caches.
  • the distributed semantic-aware cache described herein loads substantially less data (saving storage I/O processing), transfers substantially less data between the storage and compute side (saving network I/O processing), and utilizes less processor and memory (e.g., DRAM) resources for implementing the distributed compute engine, which leads to less overall cost of the architecture.
  • the execution time is also faster.
  • the distributed semantic-aware cache is shared, has an improved API, and is semantic-aware.
  • FIG. 14 illustrates a general-purpose computer 1400 suitable for implementing one or more embodiments of the methods disclosed herein.
  • the computer 1400 in FIG. 14 illustrates a general-purpose computer 1400 suitable for implementing one or more embodiments of the methods disclosed herein.
  • the computer 1400 in FIG. 14 illustrates a general-purpose computer 1400 suitable for implementing one or more embodiments of the methods disclosed herein.
  • the computer 1400 in FIG. 14 illustrates a general-purpose computer 1400 suitable for implementing one or more embodiments of the methods disclosed herein.
  • the computer 1400 may be implemented on a data analytics framework (e.g., SparkTM, HadoopTM, TensorFlowTM, and DryadTM), and the components described above may be implemented on any general-purpose network component, such as a computer 1400 with sufficient processing power, memory resources, and network throughput capability to handle the workload placed upon it.
  • the computer 1400 includes a processor 1410 (which may be referred to as a central processor unit or CPU) that is in communication with memory devices including secondary storage 1420, read only memory (ROM) 1430, random access memory (RAM) 1440, I/O devices 1450, and network connectivity devices 1460.
  • the network connectivity devices 1460 further connect the processor 1410 to a client side data analytics driver module 1470 that manages the data analytics process that uses the distributed semantic-aware cache described herein.
  • the processor 1410 may be implemented as one or more CPU chips, or may be part of one or more ASICs.
  • the secondary storage 1420 is typically comprised of one or more disk drives or tape drives and is used for non-volatile storage of data and as an over-flow data storage device if RAM 1440 is not large enough to hold all working data (e.g., the data received from the map tasks). Secondary storage 1420 may be used to store programs that are loaded into RAM 1440 when such programs are selected for execution.
  • the ROM 1430 is used to store instructions and perhaps data that are read during program execution.
  • ROM 1430 may be a non-volatile memory device that typically has a small memory capacity relative to the larger memory capacity of secondary storage 1420.
  • the RAM 1440 is used to store volatile data and perhaps to store instructions. Access to both ROM 1430 and RAM 1440 is typically faster than to secondary storage 1420.
  • computer 1400 may execute instructions from computer-readable non-transitory media storing computer-readable instructions and one or more processors coupled to the memory, and when executing the computer-readable instructions, the computer 1400 is configured to perform method steps and operations described in the disclosure with reference to FIG. 3 to FIG. 13.
  • the computer-readable non-transitory media includes all types of computer-readable media, including magnetic storage media, optical storage media, flash media and solid state storage media.
  • software including one or more computer-executable instructions that facilitate processing and operations as described above with reference to any one or all of steps of the disclosure may be installed in and sold with one or more servers or databases.
  • the software may be obtained and loaded into one or more servers or one or more databases in a manner consistent with the disclosure, including obtaining the software through physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator.
  • the software may be stored on a server for distribution over the Internet, for example.
  • the components of the illustrative devices, systems and methods employed in accordance with the illustrated embodiments may be implemented, at least in part, in digital electronic circuitry, analog electronic circuitry, or in computer hardware, firmware, software, or in combinations of them. These components also may be implemented, for example, as a computer program product such as a computer program, program code or computer instructions tangibly embodied in an information carrier, or in a machine-readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
  • a computer program product such as a computer program, program code or computer instructions tangibly embodied in an information carrier, or in a machine-readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
  • a computer program may be written in any form of programming language, including compiled or interpreted languages, and it may be deployed in any form, including as a stand-alone program or as a module, component, subroutine, or other unit suitable for use in a computing environment.
  • a computer program may be deployed to be executed on one computer or on multiple computers at one site or distributed across multiple sites and interconnected by a communication network.
  • functional programs, codes, and code segments for accomplishing the systems and methods described herein may be easily construed as within the scope of the disclosure by programmers skilled in the art to which the present disclosure pertains.
  • Method steps associated with the illustrative embodiments may be performed by one or more programmable processors executing a computer program, code or instructions to perform functions (e.g., by operating on input data and generating an output). Method steps may also be performed by, and apparatus may be implemented as, special purpose logic circuitry, e.g., an FPGA (field programmable gate array) or an ASIC, for example.
  • FPGA field programmable gate array
  • DSP digital signal processor
  • a general purpose processor may be a microprocessor, but in the alternative, the processor may be any conventional processor, controller, microcontroller, or state machine.
  • a processor may also be implemented as a combination of computing devices, e.g., a combination of a DSP and a microprocessor, a plurality of microprocessors, one or more microprocessors in conjunction with a DSP core, or any other such configuration.
  • processors suitable for the execution of a computer program include, by way of example, both general and special purpose microprocessors, and any one or more processors of any kind of digital computer.
  • a processor will receive instructions and data from a read-only memory or a random access memory or both.
  • the elements of a computer are a processor for executing instructions and one or more memory devices for storing instructions and data.
  • a computer will also include, or be operatively coupled to receive data from or transfer data to, or both, one or more mass storage devices for storing data, e.g., magnetic, magneto-optical disks, or optical disks.
  • Information carriers suitable for embodying computer program instructions and data include all forms of non-volatile memory, including by way of example, semiconductor memory devices, e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, compact disc ROM (CD-ROM), or digital versatile disc ROM (DVD-ROM).
  • EPROM electrically programmable read-only memory
  • EEPROM electrically erasable programmable ROM
  • flash memory devices e.g., electrically erasable programmable ROM (EEPROM), flash memory devices
  • data storage disks e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, compact disc ROM (CD-ROM), or digital versatile disc ROM (DVD-ROM).
  • the processor and the memory may be supplemented by,
  • a software module may reside in random access memory (RAM), flash memory, ROM, EPROM, EEPROM, registers, hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art.
  • RAM random access memory
  • ROM read-only memory
  • EPROM erasable programmable read-only memory
  • EEPROM electrically erasable programmable read-only memory
  • registers hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art.
  • the storage medium may be integral to the processor.
  • the processor and the storage medium may reside in an integrated circuit or be implemented as discrete components.
  • machine-readable medium means a device able to store instructions and data temporarily or permanently and may include, but is not limited to, random-access memory (RAM), read-only memory (ROM), buffer memory, flash memory, optical media, magnetic media, cache memory, other types of storage (e.g., EEPROM), and any suitable combination thereof.
  • RAM random-access memory
  • ROM read-only memory
  • buffer memory flash memory
  • optical media magnetic media
  • cache memory other types of storage (e.g., EEPROM), and any suitable combination thereof.
  • EEPROM electrically erasable programmable read-only memory
  • machine-readable medium shall also be taken to include any medium, or combination of multiple media, that is capable of storing instructions for execution by one or more processors, such that the instructions, when executed by one or more processors cause the one or more processors to perform any one or more of the methodologies described herein. Accordingly, a “machine -readable medium” refers to a single storage apparatus or device, as well as “cloud-based” storage systems or storage networks that include multiple storage apparatus or devices. The term “machine-readable medium” as used herein excludes signals per se.

Landscapes

  • Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Human Computer Interaction (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

A data analytics system implements a data analytics process on a distributed data analytics platform that shares cached data amongst users. The system receives a semantic-aware store command from a user, translates the DAG plan for executing the semantic-aware store command from the user into a translated DAG plan to be executed by a distributed multi-tiered shared cache, and executes the translated DAG plan including at least a final store command. The results of execution of the translated DAG plan are stored to the distributed multi-tiered shared cache. The execution of the translated DAG plan may be optimized by reviewing cached intermediate data to determine whether a branch of the DAG plan can be pruned by loading data directly, whether a repartition of the data can eliminate partitions or shuffle phases, or whether an ensuing operation can eliminate partitions using data-skipping indices.

Description

DISTRIBUTED SEMANTIC-AWARE CACHE
TECHNICAL HELD
[0001] This application is related to large-scale data processing systems and, in particular, to a distributed semantic-aware cache that replaces traditional cache store operations with corresponding semantic-aware commands that enable cache content to be utilized by multiple users in a distributed data processing system.
BACKGROUND
[0002] Distributed compute engines typically accept jobs from users in the form of Distributed Acyclic Graphs (DAGs). A sample distributed compute engine 100 is illustrated in FIG. 1. As illustrated in FIG. 1, a driver module 110 is responsible for dividing the jobs into multiple smaller tasks and assigning the tasks to worker modules 120 for execution. The Distributed Compute Engine operates at (1) by a user 130 sending a job request to the driver module 110 in the form of a DAG. At (2), the driver module 110 creates an optimized execution DAG from the input DAG and divides it into multiple tasks where each task can be executed in a single worker module 120. The tasks are sent to the worker modules 120 for execution in a specific order. At (3), the worker modules 120 load the initial data set from storage 140 in response to a load command, process the initial data set (e.g., use a filter to extract records, count records, etc.), and map and store the output back to storage 140, if necessary. At (4), the worker modules 120 send an acknowledgement or a result related to the execution of the corresponding task back to the driver module 110. At (5), the driver module 110 sends the acknowledgement or the result related to the whole execution of the job back to the user 130 when all tasks are finished.
[0003] Data analysis jobs typically utilize large data sets with millions or even billions of records. However, the results may depend on a very small fraction of the original records. For example, a dataset formed of records taken from sensors deployed in houses over a large area at a frequent rate can be extremely huge. However, a particular job’s result might be impacted only by a small portion of these records. A typical example of such a job would be the following: load("\"data/*.csv\" " ).filter(x=>x.temp=A' hotA' ).map(.. .)...
As illustrated in FIG. 1, this job loads data at 150 from a group of comma separated values (CSV) files. Then, this job filters out all records at 160 that do not have a hot temperature (>100 F). Only records in areas with a warm climate during warm periods would affect the final count taken at 170. A map transformation may be applied at 180 to facilitate storage of the data at 190.
[0004] Distributed compute engines 100 for large data workloads (e.g. Spark) adopt a naive approach for tackling workloads like the one mentioned above. The user 130 provides a job in the form of a DAG (like the simple example above, load -’-filter-’--- •)• Then, the distributed compute engine 100 creates an optimized execution plan from the user-provided DAG based on the available resources and workload characteristics. Using the above example again, a DAG execution plan (“DAG plan”) like the one shown in FIG. 2 is constructed where it is assumed that the distributed compute engine 100 divides the dataset into four different partitions 200 (Pl, P2, P3, P4). The DAG plan explains how the DAG is to be executed by the distributed compute engine. A portion of the data inside the partitions 200 has records with hot temperatures 210, and the rest of the data belongs to distinct different temperature ranges (warm 220, cool 230, and cold 240). For each partition 200, a process loads the corresponding data, filters in only the hot temperature records 210 at 250, and continues the ensuing operations, including performing map transformations 260 that facilitate data storage. Although only a small portion of the original data (i.e., hot temperature records 210) affects the result, this strategy leads to:
Storage input/output (VO) overhead analogous to the size of the initial dataset;
Network I/O overhead between the compute side (where the data is processed) and storage side (where the data is stored typically in hard disk drives (HDDs) and/or solid state drives (SSDs) analogous to the size of the original dataset);
Usage of computation and memory resources (e.g., DRAM) for the compute engine cluster is analogous to the size of the initial dataset; and
Large execution times.
[0005] A few different techniques addressing this issue have been proposed in the prior art. Such prior art techniques include individual features such as sharing cache data between users, a distributed multi-tiered cache, intermediate data, and platform agnostic designs. SUMMARY
[0006] Various examples are now described to introduce a selection of concepts in a simplified form that are further described below in the Detailed Description. The Summary is not intended to be used to limit the scope of the claimed subject matter.
[0007] Prior art cache techniques do not incorporate processing efficiencies such as semantic awareness, adaptive partitioning, and data skipping metadata. The distributed semantic-aware cache design described herein includes such processing efficiencies and may further be: Shared among users (i.e., content is accessible by many users); Distributed (utilizes cache space from multiple servers); Multi-Tiered (uses multiple tiers (e.g., memory and disk) to store data ); Semantic- Aware (includes metadata about the content and automatically uses it to improve performance); Multi-Modal (provides interfaces for all three types of interactions, repartitioning, data-skipping metadata, and intermediate data caching); and Platform- Agnostic (does not depend on a specific platform).
[0008] In sample embodiments, the distributed semantic-aware cache works on top of baseline distributed multi-tiered shared caches with simple load/store application programming interfaces (APIs). The distributed semantic-aware cache exposes a new semantic-aware API that allows the distributed semantic-aware cache to automatically use the content to optimize incoming jobs. A translator module ensures that application users do not have to orchestrate complex and sophisticated computation operations for relatively simple to understand functions (like repartitioning data, creating data skipping metadata, and caching intermediate results) on their own. The distributed semantic-aware cache also is able to efficiently share content among users since it maintains concrete knowledge of the cached content and can decide how and when to reuse it, if necessary. In addition, an optimizer module ensures that the plan is fully optimized to take advantage of the content of the distributed semantic-aware cache.
[0009] According to a first aspect of the present disclosure, there is provided a method of performing a data analytics process on a distributed data analytics platform that shares cached data amongst users. The method includes receiving a semantic-aware store command from a user, translating a directed acyclic graph (DAG) plan for executing the semantic-aware store command from the user into a translated DAG plan to be executed by a distributed shared cache, and executing the translated DAG plan including at least a final store command and storing results of execution of the translated DAG plan to the distributed shared cache.
[0010] According to a second aspect of the present disclosure, there is provided a distributed semantic-aware cache comprising a distributed shared cache that stores data that may be shared by at least two users, and a semantic-aware cache manager. The semantic aware cache manager comprises a translator module that translates a directed acyclic graph (DAG) plan for executing a semantic-aware store command from a user into a translated DAG plan to be executed by the distributed shared cache. The distributed shared cache receives and stores results of execution of the translated DAG plan and a final store command.
[0011] According to a third aspect of the present disclosure, there is provided a distributed semantic-aware cache system comprising a distributed shared cache that stores data that may be shared by at least two users, a semantic-aware cache manager, and a distributed compute engine. The semantic aware cache manager comprises a translator module that translates a directed acyclic graph (DAG) plan for executing a semantic-aware store command from a user into a translated DAG plan to be executed by the distributed shared cache. The distributed compute engine executes the translated DAG plan and stores results of execution of the translated DAG plan and a final store command in the distributed shared cache.
[0012] In a first implementation of any of the preceding aspects, the DAG plan is transformed into a semantic cache format prior to translating of the DAG plan, and the translated DAG plan is transformed into a format of a distributed compute engine that executes the translated DAG plan.
[0013] In a second implementation of any of the preceding aspects, the semantic-aware store commands from the distributed shared cache are based on knowledge about data stored in the distributed shared cache, and execution of the translated DAG plan is optimized using the knowledge about data stored in the distributed shared cache.
[0014] In a third implementation of any of the preceding aspects, a load command is executed that optimizes the DAG plan from the user by using data stored in the distributed shared cache to update the translated DAG plan with load commands from the distributed shared cache to form an optimized DAG plan.
[0015] In a fourth implementation of any of the preceding aspects, a semantic- aware cache load command including the DAG plan is sent to the distributed shared cache whenever a semantic-aware store command is executed. After forming the optimized DAG plan, the optimized DAG plan is returned for execution, and the optimized DAG plan including the load commands is executed by a distributed compute engine. Results of execution of the optimized DAG plan are stored to the distributed shared cache.
[0016] In a fifth implementation of any of the preceding aspects, the optimized DAG plan is transformed into a semantic cache format prior to translating, and the optimized DAG plan is transformed into a format of a distributed compute engine that executes the optimized DAG plan.
[0017] In a sixth implementation of any of the preceding aspects, the translated DAG plan is updated by reviewing cached intermediate data to determine at least one of (1) whether there is a branch of the DAG plan that can be pruned by loading data directly from the distributed shared cache, (2) whether there is a repartition of the data in the distributed shared cache that can eliminate partitions or shuffle phases, or (3) whether, based on data-skipping metadata, there is an ensuing operation that can eliminate partitions.
[0018] In a seventh implementation of any of the preceding aspects, the distributed shared cache comprises multiple tiers, and an evict command is received from the user specifying an output path and tier of the distributed shared cache used to store corresponding content. In response, a deletion command corresponding to the evict command is sent to the distributed shared cache, and semantic information in the distributed shared cache about the corresponding content is destroyed.
[0019] In an eighth implementation of any of the preceding aspects, the DAG plan is translated by adhering to semantics of the semantic-aware store command when transforming the semantic-aware store command into the translated DAG plan.
[0020] In a ninth implementation of any of the preceding aspects, the received semantic-aware store command comprises at least one of an adaptive partitioning command in which data is dynamically structured based on workload characteristics, a data skipping metadata command m which a data partition is pruned when eliminating the data partition does not affect the results of execution of the translated DAG plan, or an intermediate data command in which data is reused for another operation.
[0021] In a tenth implementation of any of the preceding aspects, the distributed shared cache comprises multiple tiers, and when the received semantic- aware store command comprises the adaptive partitioning command, Error! Reference source not found.
[0022] In an eleventh implementation of any of the preceding aspects, the distributed shared cache comprises multiple tiers, and when the received semantic- aware store command comprises the data skipping metadata command, the translated DAG plan is constructed that, when executed, takes input data, creates relevant metadata for an attribute specified in the data skipping metadata command, and stores the relevant metadata in a location and memory tier specified in the data skipping metadata command.
[0023] In a twelfth implementation of any of the preceding aspects, the distributed shared cache comprises multiple tiers, and when the received semantic- aware store command comprises the intermediate data command and the DAG plan from the user, the DAG plan from the user is executed and results are again stored to a specified location and memory tier of the distributed shared cache.
[0024] The method may be performed and the instructions on the computer- readable media may be processed by the apparatus, and further features of the method and instructions on the computer-readable media result from the functionality of the apparatus. Also, the explanations provided for each aspect and its implementation apply equally to the other aspects and the corresponding implementations. The different embodiments may be implemented in hardware, software, or any combination thereof. Also, any one of the foregoing examples may be combined with any one or more of the other foregoing examples to create a new embodiment within the scope of the present disclosure.
BRIEF DESCRIPTION OF THE DRAWINGS
[0025] In the drawings, which are not necessarily drawn to scale, like numerals may describe similar components in different views. The drawings illustrate generally, by way of example, but not by way of limitation, various embodiments discussed in the present document. [0026] FIG. 1 illustrates a sample distributed compute engine.
[0027] FIG. 2 illustrates a sample execution plan for the distributed compute engine of FIG. 1 where it is assumed that the distributed compute engine divides the dataset into four different partitions.
[0028] FIG. 3 illustrates an example of adaptive partitioning.
[0029] FIG. 4 illustrates an example of data-skipping.
[0030] FIG. 5 illustrates an example of intermediate data caching.
[0031] FIG. 6 illustrates a Spark distributed multi-tiered cache.
[0032] FIG. 7 illustrates an Alluxio distributed multi-tiered shared cache. [0033] FIG. 8 illustrates a distributed semantic-aware cache in a sample embodiment.
[0034] FIG. 9 is a flow chart illustrating the store protocol of the distributed semantic-aware cache of FIG. 8.
[0035] FIG. 10 illustrates loading data from the distributed semantic-aware cache of FIG. 8 for use by a second user in a sample embodiment.
[0036] FIG. 11 is a flow chart illustrating the load protocol of the distributed semantic-aware cache of FIG. 8.
[0037] FIG. 12 illustrates a sample implementation of a translator module in a sample embodiment.
[0038] FIG. 13 illustrates a sample implementation of an optimizer module in a sample embodiment.
[0039] FIG. 14 is a block diagram illustrating circuitry for performing the methods according to sample embodiments.
DETAILED DESCRIPTION
[0040] It should be understood at the outset that although an illustrative implementation of one or more embodiments are provided below, the disclosed systems and/or methods described with respect to FIGS. 3-14 may be implemented using any number of techniques, whether currently known or in existence. The disclosure should in no way be limited to the illustrative implementations, drawings, and techniques illustrated below, including the exemplary designs and implementations illustrated and described herein, but may be modified within the scope of the appended claims along with their full scope of equivalents.
[0041] The functions or algorithms described herein may be implemented in software in one embodiment. The software may include computer-executable instructions stored on computer-readable media or computer-readable storage device such as one or more non-transitory memories or other type of hardwarebased storage devices, either local or networked. Further, such functions correspond to modules, which may be software, hardware, firmware or any combination thereof. Multiple functions may be performed in one or more modules as desired, and the embodiments described are merely examples. The software may be executed on a digital signal processor, application specific integrated circuit (ASIC), microprocessor, or other type of processor operating on a computer system, such as a personal computer, server or other computer system, turning such computer system into a specifically programmed machine.
[0042] Existing large data analytics platforms like Spark™, Hadoop™, TensorFlow™, and Dryad™ commonly use Directed Acyclic Graph (DAG) models to define user jobs and incorporate caches that implement individual features such as sharing cache data between users, a distributed multi-tiered cache, intermediate data, and platform agnostic designs. The common theme amongst such systems is that they sacrifice some additional memory (DRAM, primary) or disk (HDD/SSD, secondary) space for better overall performance. As known by those skilled in the art, a cache is a memory device that stores data for faster access (e.g., an on-chip CPU cache that reduces main memory accesses, a memory chip in hard drive that caches results to reduce disk accesses, etc.). A multi-tiered cache may include different types of data storages devices such as a dynamic random access memory (DRAM), hard disk drive (HDD), and solid state drive (SSD), including multiple physical memories of different sizes and speeds to reduce slower data accesses. A cache can be used to store this extra data since it does not need to be replicated or persisted for fault -tolerance. If the contents are not found in the cache, the compute engines can revert to the baseline strategy, which is inefficient but correct.
[0043] One approach to improving cache efficiency is adaptive partitioning. Adaptive partitioning is the process in which data is dynamically restructured based on the workload characteristics. For example, if there are a lot of jobs that have a filter operation involving the temperature attribute value (like the one in the example of FIGS. 1-2), it might be beneficial to sort and split the data between partitions based on the temperature attribute value. Such an approach can be seen in FIG. 3, which shows resorting of the initial partitions 200 (Pl, P2, P3, P4) into new partitions 310 (Pl ), 320 (P2 ), 330 (P3 ), and 340 (P4 ) that correspond to distinct temperature ranges. These new partitions 310-340 can be stored using extra memory or storage space depending on the size and the hardware. When a job like the one in the example above is executed, the compute engine loads only the Pl' partition 310, which is the only one that contains records 210 with hot temperature values in a process known as partition-pruning. In the example, the filter operation 350 and map operation 360 are still executed on the compute side since for the Pl' partition 310 in the loaded partitions there can still be records that are eventually filtered out. Processing is saved since filter operations 370 and map operations 380 do not need to be performed for partitions 320, 330, or 340. As shown in FIG. 3, a smaller portion of the data is loaded from storage and transferred through the network (one fourth in this example), the compute engine utilizes fewer tasks (one instead of four in the example) with fewer total compute and memory resources, and the execution in most cases is considerably faster. However, the extra overhead due to repartitioning the data is considerable in terms of extra space and computation.
[0044] Another technique used to improve cache efficiency is data-skipping. In data-skipping, the storage side (or the compute side) maintains information about secondary attribute values (not main partition attributes) per partition. Common data structures used for such secondary attribute values include:
1. Min/Max value of a numeric attribute for each partition;
2. Lists with all the values of a category attribute inside a single partition; and
3. Bloom filters for values of an attribute inside a single partition.
A partition can be pruned if the data-skipping information ensures that eliminating the partition does not lead to a different result. In the example shown in FIG. 4, it is assumed that all the records for the hot temperature values (400) are in the Pl partition 410 and the P2 partition 420. Then, using a list for data-skipping metadata, a compute engine can safely detect that there are no hot temperature values in the P3 partition 430 or the P4 partition 440 and thus, it can prune them. In this example, the filter operations 450 and map operations 460 are still executed for the Pl partition 410 and the P2 partition 420 on the compute side. Processing is saved since filter operations 470 and map operations 480 do not need to be performed for the pruned partitions 430 and 440. [0045] Yet another technique used to improve cache efficiency is intermediate data caching where the compute engine stores the result of an operation. Then, if the same or a similar job that includes many of the same functions is executed, the stored results can be reused. In the example shown in FIG. 5, when the first job with the specified filter operation 510 (filter-in hot temperature records) is executed, the respective results 520 of the filter operation 510 are stored in memory or storage 530. The rest of the job is then executed, including mapping operations 540. When another job with the same filter operation 510 is executed, the job can skip the filter operation 510 and perform the initial data loading from storage 530. The cached results 520 may be directly loaded without executing the filter operation 510, thus saving the processing of the filter operation 510 for repeated filter operations.
[0046] Spark cache offers another cache solution including a distributed multitiered cache, but it has a few limitations. FIG. 6 summarizes the Spark cache technique. As illustrated in FIG. 6, in Spark cache, the distributed multi-tiered cache 610 is not shared between users 620 and 630. Instead, the distributed compute engine 640 performs a filter operation 650 for user 620 and stores the filter operation 650 and filter results 660 (e. g. , key value and contents) in the distributed multi-tiered cache 610, while the distributed compute engine 670 performs a filter operation 680 for user 630 and either loads the filter results at 682 into the distributed multi-tiered cache 690 or executes a mapping operation 684. Thus, in the Spark cache, users 620 and 630 cannot benefit from each other’s cache contents, and each user process must be rerun from the beginning.
[0047] Alluxio cache offers yet another cache solution including a distributed multi-tiered shared cache. However, the distributed multi-tiered shared cache itself does not provide semantic information about the content, so the content cannot be utilized unless the users have knowledge about its origins (initially created by them) and share such information. For example, one user would send a file location and provide information about the file contents to a second user through an external channel. Error! Reference source not found, summarizes the Alluxio cache technique. As illustrated in FIG. 7, the distributed compute engine 710 performs a filter operation 720 for user 730 and stores the filtered data and a pointer to the filtered data 740 in a distributed multi-tiered shared cache 750. Similarly, the distributed compute engine 760 performs a filter operation 770 for user 780 and either loads the filter results at 772 into the distributed multi-tiered shared cache 750 or executes a mapping operation 774. Though both users 730 and 780 may access data stored in the distributed multi-tiered shared cache 750, the pointer to the filtered data 740 must be shared manually by the users 730 and 780 in order for the users 730 and 780 to benefit from each other’s cache contents.
[0048] The distributed semantic-aware cache described herein incorporates features from the above-mentioned cache techniques while also providing a different application programming interface (API) compared to the standard load/store API. In particular, the distributed semantic- aware cache includes a semantic-aware API that exposes the cache contents to respective users whereby the respective users may automatically use knowledge about the cached content to optimize incoming jobs. A “semantic cache” thus includes a data interface layer above the multi-tiered cache that allows for improved query execution using metadata. The distributed semantic-aware cache provides information to the user about the cache contents via the API so that the content may be efficiently searched and/or reproduced, as necessary. In sample embodiments, the semantic-aware API exposes operations including: store -> (repartition! source path, attribute, output path, tier), dataSkippingMetadata source path, attribute, output path, tier), intermediateData input DAG, output path, tier)) load -> optimize(inputDAG) evict -> delete(path, tier)
The semantic-aware API thus replaces traditional cache store operations with corresponding semantic-aware commands that convey semantic information and implement techniques including at least one of the three techniques described above: adaptive partitioning, data skipping metadata, and intermediate data.
[0049] Error! Not a valid bookmark self-reference, illustrates a distributed semantic-aware cache in a sample embodiment. In this embodiment, the application user 810 sends a store command via the distributed compute engine 820 to a stateless semantic cache client plugin module 830 that transforms a DAG plan from a format of the native distributed compute engine 820 to a semantic cache format of distributed semantic-aware cache 840 (e.g., in case an intermediateData command is used by the distributed semantic-aware cache 840). In other words, the semantic cache client plugin module 830 transforms a DAG plan of the native distributed compute engine 820 (e.g., Spark) that describes particular jobs into the DAG plan of the distributed semantic aware cache 840, which may have a different data structure (“DAG”) for describing jobs. The distributed semantic-aware cache 840 includes a semantic cache manager 850 that is registered with the semantic cache client plugin module 830 and includes a translator module 852 and an optimizer module 854 that is responsible for transforming the DAG plan to benefit from the content of the distributed semantic-aware cache 840 (see description of FIG. 13 below). The semantic cache manager 850 produces a DAG plan that the distributed compute engine 820 can execute. The translator module 852 is responsible for transforming the store command from the user 810 into the DAG plan (e.g., repartition) that the distributed compute engine 820 can execute. The distributed compute engine 820 executes the resulting DAG and stores the results to a baseline distributed multi-tiered shared cache 842. On the other hand, the optimizer module 854 may transform the original DAG into an optimized DAG that exploits potential helpful content from the distributed semantic-aware cache 840 that works on top of the distributed multi-tiered shared cache 842 to improve the loading process to distributed compute engine 860 of another user 870 via another semantic cache client plugin module 880 that transforms the optimized DAG plan from a format of the distributed semantic-aware cache 840 into a format of the native distributed compute engine 860. More details about the loading process will be described below with respect to FIG. 10 and FIG. 11.
[0050] FIG. 9 is a flow chart including steps 910, 920, 930, 940, and 950 illustrating the store protocol 900 of the distributed semantic-aware cache of FIG.
8.
[0051] At 910, the distributed compute engine 820 propagates the user-issued store command from the user 810 to the semantic cache client plugin module 830.
[0052] At 920, when the store command includes a DAG plan (as in the case where an intermediateData command is used), the semantic cache client plugin module 830 transforms the DAG plan from a native distributed compute engine format into a semantic cache format. Then, the command is propagated to the translator module 852 of the semantic cache manager 850.
[0053] At 930, the semantic cache translator module 852 translates the store request into a DAG plan (“translated DAG plan”) explaining how the store request is to be executed by the distributed compute engine 820. The DAG plan includes store commands from the distributed multi-tiered shared cache 842 that are sent back to the semantic cache client plugin module 830. Simple examples are shown in FIG. 12.
[0054] At 940, the semantic cache client plugin module 830 performs the reverse transformation (from step 920) of the DAG plan from the format of the semantic cache manager 850 into the format of the native distributed compute engine 820.
[0055] At 950, the distributed compute engine 820 executes the DAG plan along with a final store command from the distributed semantic-aware cache 840 and stores the results in the distributed multi-tiered shared cache 842.
[0056] The distributed semantic-aware cache 840 may also replace traditional load semantics with an optimization command from the API of the distributed semantic-aware cache 840. The load command (as opposed to the store commands) should not be triggered by the application user, but the load should be perceived as an optimization of the original DAG plan provided by the user 810. FIG. 11 is a flow chart including steps 1110, 1120, 1130, 1140, and 1150 illustrating the load protocol 1100 of the distributed semantic-aware cache of FIG. 8.
[0057] FIG. 10 illustrates loading data from the distributed semantic-aware cache 840 for use by a second user 870 in a sample embodiment. When the distributed compute engine 860 of the second user 870 attempts to load data from the distributed semantic- aware cache 840, the initial DAG provided by user 810 may be optimized (e.g., use the same filter operation to eliminate the filter operation and initial data load by loading from cache) and used by the second user 870 to generate a comprehensive execution plan. In such a case, the distributed compute engine 860 asks the distributed semantic-aware cache 840 through the corresponding semantic cache client plugin module 880 if the DAG can be further optimized using the cache content. The optimizer module 854 transforms the original DAG into an optimized DAG that exploits potential helpful content from the distributed semantic-aware cache 840. Potential load commands from the distributed multi-tiered shared cache 842 are included and executed by the distributed compute engine 860 when the distributed compute engine 860 executes the optimized DAG plan (after it applies other potential optimizations on its own) by potentially loading useful data from the baseline distributed multi-tiered shared cache 842. In the event of failure or inability to optimize the data, the initial DAG plan may be used instead of creating an optimized DAG plan. [0058] FIG. 11 is a flow chart including steps 1110, 1120, 1130, 1140, and 1150 illustrating the load protocol 1100 of the distributed semantic-aware cache of FIG. 8.
[0059] At 1110, the distributed compute engine 860 sends a semantic cache load-command (DAG) to the semantic cache client plugin module 880 whenever a new job is executed.
[0060] At 1120, the semantic cache client plugin module 880 transforms the DAG plan from the format of the native distributed compute engine 860 into the format of the distributed semantic-aware cache 840. Then, the load command is propagated to the optimizer module 854 of the distributed semantic-aware cache 840.
[0061] At 1130, the optimizer module 854 of the distributed semantic-aware cache 840 optimizes the DAG plan received from the distributed compute engine 860 by transforming it and returning the optimized DAG plan to the semantic cache client plugin module 880 for execution. The optimized DAG plan may be modified to contain original load commands from the distributed multi-tiered shared cache 842.
[0062] At 1140, the semantic cache client plugin module 880 does the reverse transformation (from step 1120) of the optimized DAG plan from the format of the semantic cache manager 850 into the format of the distributed compute engine 860. [0063] At 1150, the distributed compute engine 860 executes the optimized DAG plan along with potential load commands from the distributed semantic-aware cache 840 and loads the results into the distributed multi-tiered shared cache 842. [0064] The evict command is very simple and is similar to the store commands. For the evict command, an application user specifies the output path and the memory tier used to store the corresponding content. The distributed compute engine 820 sends this information to the semantic cache manager 850 via the semantic cache client plugin module 830. In response, the semantic cache manager 850 sends a corresponding deletion command to the distributed multi-tiered shared cache 842 and destroys the internally maintained semantic information about the content.
[0065] In sample embodiments of the semantic cache manager 850, the translator module 852 accepts incoming application store commands via the distributed compute engine 820 and transforms the commands into a DAG plan that adheres to the command s semantics and that can be executed m the distributed compute engine. Error! Reference source not found, specifying the input DAG, the output path, and the memory tier, the translator module 852 instructs the distributed compute engine 820 to run the input DAG plan and then store its results again to the specified location and tier of the distributed multi-tiered shared cache 842 at 1280. The cached data may be used while skipping the initial loading of data.
[0066] In sample embodiments of the semantic cache manager 850, the optimizer module 854 is responsible for transforming the DAG plan to benefit from the content of the distributed semantic- aware cache 840. A sample implementation of the optimizer module 854 is shown in Error! Reference source not found. IG. 13. [0067] A DAG may be represented as a collection of interleaving (or noninterleaving) directed trees where the root is a node with no incoming edges. In a DAG, it is guaranteed that there is at least one node with no incoming edge. Thus, each tree may be split and analyzed separately (as a different sub-job). In the example of FIG. 13, the map is the root node and the load is the leaf node for optimization purposes. In FIG. 13, the arrows have opposite direction to determine the order of execution As illustrated in FIG. 13, the optimizer module 854 recursively looks at a tree starting from the root 1300 first at 1302 to see if there is any branch (a.k.a., directed tree) of the tree (operations) that can be pruned by loading data directly from the baseline distributed multi-tiered shared cache 842 (node: =plan.root) .
[0068] Then, the optimizer module 854 looks at the current node in the tree at 1304 and 1306 to see if the current node is a shuffle node participating in a shuffle operation or a filter node participating in a filter operation. As known to those skilled in the art, some operations like map, filter, and many more cause narrow dependencies between tasks (one-to-one communication patterns). Typically, these operations are bundled together and executed by the same process. Operations like join, reduceByKey, groupByKey, and many more, cause wide dependencies (many- to-many communication patterns) between different tasks. Traditionally, this exchange of data is denoted as a Shuffle Phase that is implemented at one or more shuffle nodes. An attribute of the shuffle operation is maintained for repartitioning and data-skipping metadata optimizations and reset filter attributes since they are no longer relevant due to shuffling at 1310. The filter node may maintain filter attributes at 1308. [0069] The optimizer module 854 further checks if the current node can be replaced with intermediate data content at 1312 from the baseline distributed multitiered shared cache 842. A recursive call is executed at 1314, 1316, and 1318 to optimize the DAG for each one of the children of the current node.
[0070] If intermediate data is found at 1320, then the optimizer module 854 replaces the current node with a load from the baseline distributed multi-tiered shared cache 842 at 1324.
[0071] If the current node is not a leaf node (load from datasource) at 1322, the current node is returned at 1326. Then, the optimizer module 854 looks for repartitions and data-skipping metadata at 1326.
[0072] If a repartition for the current node with a filter or shuffle attribute (listed at 1310 and 1308, respectively) is found at 1328, the optimizer module 854 replaces the leaf node (datasource) with the new repartition from the baseline distributed multi-tiered shared cache 842 at 1332.
[0073] If no repartition is found and data-skipping metadata for the current node with a filter attribute (listed at 1308) is found at 1330, the optimizer module 854 replaces the leaf node (datasource) with a new load that skips partition reading according to the data-skipping metadata from the baseline distributed multi-tiered shared cache 842 at 1334.
[0074] Finally, the optimizer module 854 returns the modified optimized tree at 1336.
[0075] In alternative embodiments, a customized eviction module can be used to enable a user to orchestrate a delete operation. The user may decide what data to automatically delete (evict) when the cache is full so that users may directly delete data from the distributed semantic-aware cache 840. In such an embodiment, an automatic procedure may be provided that relieves users from managing this aspect of caching, but it might make sub-optimal decisions as a result.
[0076] As noted herein, sample embodiments of the distributed semantic-aware cache 840 include multiple memory tiers. For example, the distributed semantic- aware cache 840 may be a two-tiered cache including memory (e.g., HDD) and disk (e.g. SSD). However, the distributed semantic-aware cache 840 also can be more sophisticated and complex. For example, the distributed semantic-aware cache 840 may include memory in a compute side, disk in a compute side, memory in a storage side, and disk in a storage side that are selectable by the user. This allows users to have more flexibility to fine-tune their decisions, but it makes the orchestration of the data more complex and difficult.
[0077] Those skilled in the art will appreciate that the distributed semantic- aware cache described herein has numerous benefits over conventional caches. For example, the distributed semantic-aware cache described herein loads substantially less data (saving storage I/O processing), transfers substantially less data between the storage and compute side (saving network I/O processing), and utilizes less processor and memory (e.g., DRAM) resources for implementing the distributed compute engine, which leads to less overall cost of the architecture. The execution time is also faster. Also, unlike conventional systems, the distributed semantic-aware cache is shared, has an improved API, and is semantic-aware.
[0078] More generally, the distributed semantic-aware cache described herein has a design and API that allows for effective sharing of content between different compute clusters. A write (store) protocol ensures that users write less code and avoid compute orchestration overheads, while a read (load) protocol ensures that the compute engine’s code is optimized and has benefits in storage I/O, network I/O, CPU resources utilization, and memory space utilization. [0079] FIG. 14 illustrates a general-purpose computer 1400 suitable for implementing one or more embodiments of the methods disclosed herein. For example, the computer 1400 in FIG. 14 may be implemented on a data analytics framework (e.g., Spark™, Hadoop™, TensorFlow™, and Dryad™), and the components described above may be implemented on any general-purpose network component, such as a computer 1400 with sufficient processing power, memory resources, and network throughput capability to handle the workload placed upon it. The computer 1400 includes a processor 1410 (which may be referred to as a central processor unit or CPU) that is in communication with memory devices including secondary storage 1420, read only memory (ROM) 1430, random access memory (RAM) 1440, I/O devices 1450, and network connectivity devices 1460. In sample embodiments, the network connectivity devices 1460 further connect the processor 1410 to a client side data analytics driver module 1470 that manages the data analytics process that uses the distributed semantic-aware cache described herein. The processor 1410 may be implemented as one or more CPU chips, or may be part of one or more ASICs. [0080] The secondary storage 1420 is typically comprised of one or more disk drives or tape drives and is used for non-volatile storage of data and as an over-flow data storage device if RAM 1440 is not large enough to hold all working data (e.g., the data received from the map tasks). Secondary storage 1420 may be used to store programs that are loaded into RAM 1440 when such programs are selected for execution. The ROM 1430 is used to store instructions and perhaps data that are read during program execution. ROM 1430 may be a non-volatile memory device that typically has a small memory capacity relative to the larger memory capacity of secondary storage 1420. The RAM 1440 is used to store volatile data and perhaps to store instructions. Access to both ROM 1430 and RAM 1440 is typically faster than to secondary storage 1420. [0081] It should be understood that computer 1400 may execute instructions from computer-readable non-transitory media storing computer-readable instructions and one or more processors coupled to the memory, and when executing the computer-readable instructions, the computer 1400 is configured to perform method steps and operations described in the disclosure with reference to FIG. 3 to FIG. 13. The computer-readable non-transitory media includes all types of computer-readable media, including magnetic storage media, optical storage media, flash media and solid state storage media.
[0082] It should be further understood that software including one or more computer-executable instructions that facilitate processing and operations as described above with reference to any one or all of steps of the disclosure may be installed in and sold with one or more servers or databases. Alternatively, the software may be obtained and loaded into one or more servers or one or more databases in a manner consistent with the disclosure, including obtaining the software through physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator. The software may be stored on a server for distribution over the Internet, for example.
[0083] Also, it will be understood by one skilled in the art that this disclosure is not limited in its application to the details of construction and the arrangement of components set forth in the following description or illustrated in the drawings. The embodiments herein are capable of other embodiments, and capable of being practiced or carried out in various ways. Also, it will be understood that the phraseology and terminology used herein is for the purpose of description and should not be regarded as limiting. The use of "including," "comprising," or "having" and variations thereof herein is meant to encompass the items listed thereafter and equivalents thereof as well as additional items. [0084] The components of the illustrative devices, systems and methods employed in accordance with the illustrated embodiments may be implemented, at least in part, in digital electronic circuitry, analog electronic circuitry, or in computer hardware, firmware, software, or in combinations of them. These components also may be implemented, for example, as a computer program product such as a computer program, program code or computer instructions tangibly embodied in an information carrier, or in a machine-readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
[0085] A computer program may be written in any form of programming language, including compiled or interpreted languages, and it may be deployed in any form, including as a stand-alone program or as a module, component, subroutine, or other unit suitable for use in a computing environment. A computer program may be deployed to be executed on one computer or on multiple computers at one site or distributed across multiple sites and interconnected by a communication network. Also, functional programs, codes, and code segments for accomplishing the systems and methods described herein may be easily construed as within the scope of the disclosure by programmers skilled in the art to which the present disclosure pertains. Method steps associated with the illustrative embodiments may be performed by one or more programmable processors executing a computer program, code or instructions to perform functions (e.g., by operating on input data and generating an output). Method steps may also be performed by, and apparatus may be implemented as, special purpose logic circuitry, e.g., an FPGA (field programmable gate array) or an ASIC, for example.
[0086] The various illustrative logical blocks, modules, and circuits described in connection with the embodiments disclosed herein may be implemented or performed with a general purpose processor, a digital signal processor (DSP), an ASIC, a FPGA or other programmable logic device, discrete gate or transistor logic, discrete hardware components, or any combination thereof designed to perform the functions described herein. A general purpose processor may be a microprocessor, but in the alternative, the processor may be any conventional processor, controller, microcontroller, or state machine. A processor may also be implemented as a combination of computing devices, e.g., a combination of a DSP and a microprocessor, a plurality of microprocessors, one or more microprocessors in conjunction with a DSP core, or any other such configuration.
[0087] Processors suitable for the execution of a computer program include, by way of example, both general and special purpose microprocessors, and any one or more processors of any kind of digital computer. Generally, a processor will receive instructions and data from a read-only memory or a random access memory or both. The elements of a computer are a processor for executing instructions and one or more memory devices for storing instructions and data. Generally, a computer will also include, or be operatively coupled to receive data from or transfer data to, or both, one or more mass storage devices for storing data, e.g., magnetic, magneto-optical disks, or optical disks. Information carriers suitable for embodying computer program instructions and data include all forms of non-volatile memory, including by way of example, semiconductor memory devices, e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, compact disc ROM (CD-ROM), or digital versatile disc ROM (DVD-ROM). The processor and the memory may be supplemented by, or incorporated in, special purpose logic circuitry.
[0088] Those of skill in the art understand that information and signals may be represented using any of a variety of different technologies and techniques. For example, data, instructions, commands, information, signals, bits, symbols, and chips that may be referenced throughout the above description may be represented by voltages, currents, electromagnetic waves, magnetic fields or particles, optical fields or particles, or any combination thereof.
[0089] Those skilled in the art may further appreciate that the various illustrative logical blocks, modules, circuits, and algorithm steps described in connection with the embodiments disclosed herein may be implemented as electronic hardware, computer software, or combinations of both. To clearly illustrate this interchangeability of hardware and software, various illustrative components, blocks, modules, circuits, and steps have been described above generally in terms of their functionality. Whether such functionality is implemented as hardware or software depends upon the particular application and design constraints imposed on the overall system. Skilled artisans may implement the described functionality in varying ways for each particular application, but such implementation decisions should not be interpreted as causing a departure from the scope of the present disclosure. A software module may reside in random access memory (RAM), flash memory, ROM, EPROM, EEPROM, registers, hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art. A sample storage medium is coupled to the processor such the processor may read information from, and write information to, the storage medium. In the alternative, the storage medium may be integral to the processor. In other words, the processor and the storage medium may reside in an integrated circuit or be implemented as discrete components.
[0090] As used herein, “machine-readable medium” means a device able to store instructions and data temporarily or permanently and may include, but is not limited to, random-access memory (RAM), read-only memory (ROM), buffer memory, flash memory, optical media, magnetic media, cache memory, other types of storage (e.g., EEPROM), and any suitable combination thereof. The term “machine-readable medium” should be taken to include a single medium or multiple media e.g., a centralized or distributed database, or associated caches and servers) able to store processor instructions. The term “machine-readable medium” shall also be taken to include any medium, or combination of multiple media, that is capable of storing instructions for execution by one or more processors, such that the instructions, when executed by one or more processors cause the one or more processors to perform any one or more of the methodologies described herein. Accordingly, a “machine -readable medium” refers to a single storage apparatus or device, as well as “cloud-based” storage systems or storage networks that include multiple storage apparatus or devices. The term “machine-readable medium” as used herein excludes signals per se.
[0091] Although a few embodiments have been described in detail above, other modifications are possible. For example, the logic flows depicted in the figures do not require the particular order shown, or sequential order, to achieve desirable results. Other steps may be provided, or steps may be eliminated, from the described flows, and other components may be added to, or removed from, the described systems. Other embodiments may be within the scope of the following claims.

Claims

CLAIMS What is claimed is:
1. A method of performing a data analytics process on a distributed data analytics platform that shares cached data amongst users, the method comprising: receiving a semantic-aware store command from a user; translating, by one or more processors, a directed acyclic graph (DAG) plan for executing the semantic-aware store command from the user into a translated DAG plan to be executed by a distributed shared cache; and executing, by the one or more processors, the translated DAG plan including at least a final store command and storing results of execution of the translated DAG plan to the distributed shared cache.
2. The method of claim 1, further comprising: transforming the DAG plan into a semantic cache format prior to the translating of the DAG plan; and transforming the translated DAG plan into a format of a distributed compute engine that executes the translated DAG plan.
3. The method of claim 1 or claim 2, wherein the semantic-aware store commands from the distributed shared cache are based on knowledge about data stored in the distributed shared cache, further comprising optimizing the execution of the translated DAG plan using the knowledge about data stored in the distributed shared cache.
4. The method of any one of claims 1 to 3, further comprising: executing a load command that optimizes the DAG plan from the user by: using data stored in the distributed shared cache to update the translated DAG plan with load commands from the distributed shared cache to form an optimized DAG plan.
5. The method of claim 4, further comprising:
23 sending a semantic-aware cache load command including the DAG plan to the distributed shared cache whenever a semantic-aware store command is executed; after forming the optimized DAG plan, returning the optimized DAG plan for execution; and executing the optimized DAG plan including the load commands and storing results of execution of the optimized DAG plan to the distributed shared cache.
6. The method of claim 5, further comprising transforming the optimized DAG plan into a semantic cache format prior to translating, and transforming the optimized DAG plan into a format of a distributed compute engine that executes the optimized DAG plan.
7. The method of claim 5 or claim 6, wherein the updating of the translated DAG plan includes reviewing cached intermediate data to determine at least one of (1) whether there is a branch of the DAG plan that can be pruned by loading data directly from the distributed shared cache, (2) whether there is a repartition of the data in the distributed shared cache that can eliminate partitions or shuffle phases, or (3) whether, based on data-skipping metadata, there is an ensuing operation that can eliminate partitions.
8. The method of any one of claims 1 to 7, wherein the distributed shared cache comprises multiple tiers, further comprising: receiving an evict command from the user specifying an output path and tier of the distributed shared cache used to store corresponding content; sending a deletion command corresponding to the evict command to the distributed shared cache; and destroying semantic information in the distributed shared cache about the corresponding content.
9. The method of any one of claims 1 to 8, wherein the translating of the DAG plan comprises transforming the semantic-aware store command into the translated DAG plan by adhering to semantics of the semantic-aware store command.
10. The method of any one of claims 1 to 9, wherein the received semantic- aware store command comprises at least one of an adaptive partitioning command in which data is dynamically structured based on workload characteristics, a data skipping metadata command in which a data partition is pruned when eliminating the data partition does not affect the results of execution of the translated DAG plan, or an intermediate data command in which data is reused for another operation.
11. The method of claim 10, wherein the distributed shared cache comprises multiple tiers and wherein when the received semantic-aware store command comprises the adaptive partitioning command, constructing as the translated DAG plan a DAG plan that, when executed, takes input data, sorts the input data according to an attribute specified in the adaptive partitioning command, and stores the sorted input data in a location and memory tier specified in the adaptive partitioning command.
12. The method of claim 10, wherein the distributed shared cache comprises multiple tiers and wherein when the received semantic-aware store command comprises the data skipping metadata command, constructing as the translated DAG plan a DAG plan that, when executed, takes input data, creates relevant metadata for an attribute specified in the data skipping metadata command, and stores the relevant metadata in a location and memory tier specified in the data skipping metadata command.
13. The method of claim 10, wherein the distributed shared cache comprises multiple tiers and wherein when the received semantic-aware store command comprises the intermediate data command and the DAG plan from the user, executing the DAG plan from the user and storing results again to a specified location and memory tier of the distributed shared cache.
14. A distributed semantic-aware cache, comprising: a distributed shared cache that stores data that may be shared by at least two users; and a semantic-aware cache manager comprising a translator module that translates a directed acyclic graph (DAG) plan for executing a semantic-aware store command from a user into a translated DAG plan to be executed by the distributed shared cache, wherein the distributed shared cache receives and stores results of execution of the translated DAG plan and a final store command.
15. The distributed semantic-aware cache of claim 14, wherein the semantic- aware cache manager further comprises an optimizer module that optimizes execution of the translated DAG plan by providing semantic-aware store commands from the distributed shared cache that are based on knowledge about data stored in the distributed shared cache to form an optimized DAG plan for execution.
16. The distributed semantic-aware cache of claim 15, wherein the optimizer module updates the DAG plan with a load command from the distributed shared cache to form the optimized DAG plan for execution.
17. The distributed semantic-aware cache of claim 16, wherein the distributed shared cache receives a semantic-aware cache load command including the optimized DAG plan whenever a semantic-aware store command is executed and stores results of execution of the optimized DAG plan.
18. The distributed semantic-aware cache of claim 16 or claim 17, wherein the optimizer module optimizes the DAG plan by reviewing cached intermediate data to determine at least one of (1) whether there is a branch of the DAG plan that can be pruned by loading data directly from the distributed shared cache, (2) whether there is a repartition of the data in the distributed shared cache that can eliminate partitions or shuffle phases, or (3) whether, based on data-skipping metadata, there is an ensuing operation that can eliminate partitions.
19. The distributed semantic-aware cache of any one of claims 14 to 18, wherein the semantic-aware cache includes an application programming interface that receives a semantic-aware store command comprising at least one of an adaptive partitioning command in which data is dynamically structured based on workload characteristics, a data skipping metadata command in which a data partition is pruned when eliminating the data partition does not affect the results of
26 execution of the translated DAG plan, or an intermediate data command in which data is reused for another operation.
20. A distributed semantic-aware cache system, comprising: a distributed shared cache that stores data that may be shared by at least two users; a semantic-aware cache manager comprising a translator module that translates a directed acyclic graph (DAG) plan for executing a semantic-aware store command from a user into a translated DAG plan to be executed by the distributed shared cache; and a distributed compute engine that executes the translated DAG plan and stores results of execution of the translated DAG plan and a final store command in the distributed shared cache.
27
PCT/US2021/015022 2021-01-26 2021-01-26 Distributed semantic-aware cache Ceased WO2022164419A1 (en)

Priority Applications (2)

Application Number Priority Date Filing Date Title
CN202180091941.9A CN116802618A (en) 2021-01-26 2021-01-26 Distributed semantic-aware cache
PCT/US2021/015022 WO2022164419A1 (en) 2021-01-26 2021-01-26 Distributed semantic-aware cache

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
PCT/US2021/015022 WO2022164419A1 (en) 2021-01-26 2021-01-26 Distributed semantic-aware cache

Publications (1)

Publication Number Publication Date
WO2022164419A1 true WO2022164419A1 (en) 2022-08-04

Family

ID=74669540

Family Applications (1)

Application Number Title Priority Date Filing Date
PCT/US2021/015022 Ceased WO2022164419A1 (en) 2021-01-26 2021-01-26 Distributed semantic-aware cache

Country Status (2)

Country Link
CN (1) CN116802618A (en)
WO (1) WO2022164419A1 (en)

Non-Patent Citations (1)

* Cited by examiner, † Cited by third party
Title
CHAO-QIANG HUANG ET AL: "RDDShare: Reusing Results of Spark RDD", 2016 IEEE FIRST INTERNATIONAL CONFERENCE ON DATA SCIENCE IN CYBERSPACE (DSC), IEEE, 13 June 2016 (2016-06-13), pages 370 - 375, XP033072314, DOI: 10.1109/DSC.2016.80 *

Also Published As

Publication number Publication date
CN116802618A (en) 2023-09-22

Similar Documents

Publication Publication Date Title
Stonebraker et al. MapReduce and parallel DBMSs: friends or foes?
Batarfi et al. Large scale graph processing systems: survey and an experimental evaluation
Fan et al. The Case Against Specialized Graph Analytics Engines.
US8620903B2 (en) Database distribution system and methods for scale-out applications
US8364751B2 (en) Automated client/server operation partitioning
Chen et al. A study of SQL-on-Hadoop systems
US9971820B2 (en) Distributed system with accelerator-created containers
US11657069B1 (en) Dynamic compilation of machine learning models based on hardware configurations
Humbetov Data-intensive computing with map-reduce and hadoop
CN104679898A (en) Big data access method
US11636124B1 (en) Integrating query optimization with machine learning model prediction
US20120158805A1 (en) Non-disruptive data movement and node rebalancing in extreme oltp environments
EP2981908A1 (en) Query integration across databases and file systems
US20220374404A1 (en) Data Storage Using Roaring Binary-Tree Format
US20250328527A1 (en) Multi-cluster query result caching
CN117787432A (en) Machine learning method and device based on lake-warehouse integration
Tian et al. DiNoDB: Efficient large-scale raw data analytics
CN115756520A (en) FlinkSQL deployment method and device in distributed cluster
Vaidya Parallel processing of cluster by map reduce
WO2022164419A1 (en) Distributed semantic-aware cache
WO2023097270A1 (en) Detecting idle periods at network endpoints for management actions at processing clusters for managed databases
Qu et al. HBelt: Integrating an incremental ETL pipeline with a big data store for real-time analytics
von Bültzingsloewen et al. Design and implementation of KARDAMOM—A set-oriented data flow database machine
WO2017027015A1 (en) Distribute execution of user-defined function
Sakr Large-scale graph processing systems

Legal Events

Date Code Title Description
121 Ep: the epo has been informed by wipo that ep was designated in this application

Ref document number: 21706772

Country of ref document: EP

Kind code of ref document: A1

WWE Wipo information: entry into national phase

Ref document number: 202180091941.9

Country of ref document: CN

NENP Non-entry into the national phase

Ref country code: DE

122 Ep: pct application non-entry in european phase

Ref document number: 21706772

Country of ref document: EP

Kind code of ref document: A1