Skip to content

dagster

Utilities for working with the dagster instance

PartitionDefinition3DBagDistribution

Bases: StaticPartitionsDefinition

Distribution tiles

Source code in bag3d/common/utils/dagster.py
62
63
64
65
66
67
68
69
70
71
72
class PartitionDefinition3DBagDistribution(StaticPartitionsDefinition):
    """Distribution tiles"""

    def __init__(self):
        logger = get_dagster_logger("PartitionDefinition3DBagDistribution")
        try:
            tile_ids = get_export_tile_ids()
        except BaseException as e:
            logger.exception(e)
            tile_ids = []
        super().__init__(partition_keys=sorted(list(tile_ids)))

format_date(input_date, version=True)

Formats a date for using it in versions, filenames, attributes etc.

Parameters:

Name Type Description Default
input_date date
required
version bool

If True, format the input_date as '9999.99.99', else as '9999-99-99'

True
Source code in bag3d/common/utils/dagster.py
39
40
41
42
43
44
45
46
47
48
49
def format_date(input_date: date, version: bool = True) -> str:
    """Formats a date for using it in versions, filenames, attributes etc.

    Args:
        input_date:
        version: If True, format the input_date as '9999.99.99', else as '9999-99-99'
    """
    if version:
        return input_date.strftime("%Y.%m.%d")
    else:
        return input_date.strftime("%Y-%m-%d")

get_run_id(context, short=True)

Return the Run ID from the execution context.

Parameters:

Name Type Description Default
context

A dagster context object.

required
short bool

Return only the first 8 characters of the ID or the complete ID.

True
Source code in bag3d/common/utils/dagster.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def get_run_id(context, short=True):
    """Return the Run ID from the execution context.

    Args:
        context: A dagster context object.
        short (bool): Return only the first 8 characters of the ID or the complete ID.
    """
    if context is None:
        return None
    if context.dagster_run is None:
        return None
    if short:
        return context.dagster_run.run_id.split("-")[0]
    else:
        return context.dagster_run.run_id.split("-")

get_upstream_data_version(context, asset_key)

Get the data version of an upstream asset. The upstream asset must be a dependency of the current asset that passes its execution context into this function.

Source code in bag3d/common/utils/dagster.py
52
53
54
55
56
57
58
59
def get_upstream_data_version(
    context: AssetExecutionContext, asset_key: AssetKey
) -> str:
    """Get the data version of an upstream asset.
    The upstream asset must be a dependency of the current asset that passes its
    execution context into this function."""
    step_execution_context = context.get_step_execution_context()
    return str(step_execution_context.input_asset_records[asset_key].data_version.value)