swh.export.luigi module#
Luigi tasks#
This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-graph’s.
File layout#
Tasks in this module work on “export directories”, which have this layout:
swh_<date>[_<flavor>]/
    edges/
        origin/
        snapshot/
        ...
        stamps/
            origin
            snapshot
            ...
    orc/
        origin/
        snapshot/
        ...
        stamps/
            origin
            snapshot
            ...
    meta/
        export.json
stamps files are written after corresponding directories are written.
Their presence indicates the corresponding directory was fully generated/copied.
This allows skipping work that was already done, while ignoring interrupted jobs.
They are omitted after the initial export (ie. when downloading to/from other machines).
meta/export.json contains information about the dataset, for provenance tracking.
For example:
{
    "flavor": "full",
    "export_start": "2022-11-08T11:00:54.998799+00:00",
    "export_end": "2022-11-08T11:05:53.105519+00:00",
    "brokers": [
        "broker1.journal.staging.swh.network:9093"
    ],
    "prefix": "swh.journal.objects",
    "formats": [
        "edges",
        "orc"
    ],
    "object_types": [
        "revision",
        "release",
        "snapshot",
        "origin_visit_status",
        "origin_visit",
        "origin"
    ],
    "privileged": false,
    "hostname": "desktop5",
    "tool": {
        "name": "swh.export",
        "version": "0.3.2"
    }
}
object_types contains a list of “main tables” exported; this excludes relational
tables like directory_entry.
Running all on staging#
An easy way to run it (eg. on the staging database), is to have these config files:
And run this command, for example:
luigi --log-level INFO --local-scheduler --module swh.export.luigi RunExportAll             --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/             --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/             --athena-db-name=vlorentz_20221109_staging
Note that this arbitrarily divides config options between luigi.cfg and the CLI
for readability; but they can be used interchangeably
- class swh.export.luigi.ObjectType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- Bases: - Enum- origin = 1#
 - origin_visit = 2#
 - origin_visit_status = 3#
 - snapshot = 4#
 - release = 5#
 - revision = 6#
 - directory = 7#
 - content = 8#
 - skipped_content = 9#
 
- class swh.export.luigi.Format(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- Bases: - Enum- edges = 1#
 - orc = 2#
 
- swh.export.luigi.merge_lists(lists: Iterator[List[T]]) List[T][source]#
- Returns a list made of all items of the arguments, with no duplicate. 
- class swh.export.luigi.PathParameter(is_dir: bool = False, is_file: bool = False, exists: bool = False, create: bool = False, **kwargs)[source]#
- Bases: - PathParameter- A parameter that is a local filesystem path. - If - is_dir,- is_file, or- existsis- True, then existence of the path (and optionally type) is checked.- If - createis set, then- is_dirmust be- True, and the directory is created if it does not already exist.- Parameters:
- is_dir – whether the path should be to a directory 
- is_file – whether the path should be to a directory 
- exists – whether the path should already exist 
- create – whether the path should be created if it does not exist 
 
 - is_dirand- is_fileare mutually exclusive.- existsand- createare mutually exclusive.
- class swh.export.luigi.S3PathParameter(*args, **kwargs)[source]#
- Bases: - Parameter- A parameter that strip trailing slashes - normalize(s)[source]#
- Given a parsed parameter value, normalizes it. - The value can either be the result of parse(), the default value or arguments passed into the task’s constructor by instantiation. - This is very implementation defined, but can be used to validate/clamp valid values. For example, if you wanted to only accept even integers, and “correct” odd values to the nearest integer, you can implement normalize as - x // 2 * 2.
 
- class swh.export.luigi.FractionalFloatParameter(*args, **kwargs)[source]#
- Bases: - FloatParameter- A float parameter that must be between 0 and 1 
- swh.export.luigi.stamps_paths(formats: List[Format], object_types: List[ObjectType]) List[str][source]#
- Returns a list of (local FS or S3) paths used to mark tables as successfully exported. 
- swh.export.luigi.get_masked_swhids(logger, config: Dict[str, Any]) Set[ExtendedSWHID][source]#
- Fetches the masking database and returns the list of all non-visible SWHIDs 
- class swh.export.luigi.StartExport(*args, **kwargs)[source]#
- Bases: - Task- Pseudo-task that computes the journal offsets from and to which objects should be exported - export_id = <luigi.parameter.OptionalParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - output() Dict[str | ObjectType, LocalTarget][source]#
- Returns a stamp file for each step, in self.local_export_path/tmp/stamps/ 
 
- class swh.export.luigi.ExportTopic(*args, **kwargs)[source]#
- Bases: - Task- Exports a single topic, given already computed offsets in the journal. - export_id = <luigi.parameter.OptionalParameter object>#
 - formats = <luigi.parameter.EnumListParameter object>#
 - processes = <luigi.parameter.IntParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - requires() Dict[str, Task][source]#
- Returns an instance of - StartExport
 
- class swh.export.luigi.ExportPersonsTable(*args, **kwargs)[source]#
- Bases: - Task- Aggregates lists of persons exported by - ExportTopicinto a single table with no duplicates.- export_id = <luigi.parameter.OptionalParameter object>#
 - formats = <luigi.parameter.EnumListParameter object>#
 - processes = <luigi.parameter.IntParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - requires() Dict[str, Task][source]#
- Returns an instance of - StartExport, and an instance of- ExportTopicfor each value in- self.object_types
 - run()[source]#
- Aggregates lists of persons exported by - ExportTopicinto a single table with no duplicates.
 
- class swh.export.luigi.ExportGraph(*args, **kwargs)[source]#
- Bases: - Task- Exports the entire graph to the local filesystem. - Example invocation: - luigi --local-scheduler --module swh.export.luigi ExportGraph --config=graph.prod.yml --local-export-path=export/ --formats=edges - which is equivalent to this CLI call: - swh export –config-file graph.prod.yml graph export export/ –formats=edges - export_id = <luigi.parameter.OptionalParameter object>#
 - formats = <luigi.parameter.EnumListParameter object>#
 - processes = <luigi.parameter.IntParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - export_name = <luigi.parameter.Parameter object>#
 - complete() bool[source]#
- If the task has any outputs, return - Trueif all outputs exist. Otherwise, return- False.- However, you may freely override this method with custom logic. 
 - requires() Dict[str, Task][source]#
- Returns an instance of - StartExport, and an instance of- ExportTopicfor each value in- self.object_types
 
- class swh.export.luigi.UploadExportToS3(*args, **kwargs)[source]#
- Bases: - Task- Uploads a local dataset export to S3; creating automatically if it does not exist. - Example invocation: - luigi --local-scheduler --module swh.export.luigi UploadExportToS3 --local-export-path=export/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 - formats = <luigi.parameter.EnumListParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - requires() List[Task][source]#
- Returns a - ExportGraphtask that writes local files at the expected location.
 
- class swh.export.luigi.DownloadExportFromS3(*args, **kwargs)[source]#
- Bases: - Task- Downloads a local dataset export from S3. - This performs the inverse operation of - UploadExportToS3- Example invocation: - luigi --local-scheduler --module swh.export.luigi DownloadExportFromS3 --local-export-path=export/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 - formats = <luigi.parameter.EnumListParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - parallelism = <luigi.parameter.IntParameter object>#
 - requires() List[Task][source]#
- Returns a - ExportGraphtask that writes local files at the expected location.
 
- class swh.export.luigi.LocalExport(*args, **kwargs)[source]#
- Bases: - Task- Task that depends on a local dataset being present – either directly from - ExportGraphor via- DownloadExportFromS3.- formats = <luigi.parameter.EnumListParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - export_task_type = <luigi.parameter.TaskParameter object>#
 - requires() List[Task][source]#
- Returns an instance of either - ExportGraphor- DownloadExportFromS3depending on the value of- export_task_type.
 
- class swh.export.luigi.AthenaDatabaseTarget(name: str, table_names: Set[str])[source]#
- Bases: - Target- Target for the existence of a database on Athena. 
- class swh.export.luigi.CreateAthena(*args, **kwargs)[source]#
- Bases: - Task- Creates tables on AWS Athena pointing to a given graph dataset on S3. - Example invocation: - luigi --local-scheduler --module swh.export.luigi CreateAthena --ExportGraph-config=graph.staging.yml --athena-db-name=swh_20221108 --object-types=origin,origin_visit --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena - which is equivalent to this CLI call: - swh export athena create –database-name swh_20221108 –location-prefix s3://softwareheritage/graph/swh_2022-11-08 –output-location s3://softwareheritage/graph/tmp/athena –replace-tables - object_types = <luigi.parameter.EnumListParameter object>#
 - s3_export_path = <swh.export.luigi.S3PathParameter object>#
 - s3_athena_output_location = <swh.export.luigi.S3PathParameter object>#
 - athena_db_name = <luigi.parameter.Parameter object>#
 - requires() List[Task][source]#
- Returns the corresponding - UploadExportToS3instance, with ORC as only format.
 - output() List[Target][source]#
- Returns an instance of - AthenaDatabaseTarget.
 
- class swh.export.luigi.RunExportAll(*args, **kwargs)[source]#
- Bases: - WrapperTask- Runs both the S3 and Athena export. - Example invocation: - luigi --local-scheduler --module swh.export.luigi RunExportAll --ExportGraph-config=graph.staging.yml --ExportGraph-processes=12 --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 --athena-db-name=swh_20221108 --object-types=origin,origin_visit --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena - formats = <luigi.parameter.EnumListParameter object>#
 - object_types = <luigi.parameter.EnumListParameter object>#
 - s3_export_path = <swh.export.luigi.S3PathParameter object>#
 - s3_athena_output_location = <swh.export.luigi.S3PathParameter object>#
 - athena_db_name = <luigi.parameter.Parameter object>#
 - requires() List[Task][source]#
- Returns instances of - CreateAthenaand- UploadExportToS3.