Skip to content

executables

AppImage

An application, either as paths of executables, or as a docker image.

Source code in bag3d/common/resources/executables.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class AppImage:
    """An application, either as paths of executables, or as a docker image."""

    def __init__(
        self,
        exes: dict,
        docker_cfg: DockerConfig = None,
        with_docker: bool = False,
        kwargs: dict = None,
    ):
        self.logger = get_dagster_logger()
        self.exes = exes
        self.with_docker = with_docker
        self.kwargs = kwargs
        if self.with_docker:
            self.docker_client = docker.from_env()
            try:
                self.docker_image = self.docker_client.images.get(docker_cfg.image)
            except ImageNotFound:
                self.docker_image = self.docker_client.images.pull(docker_cfg.image)
            self.container_mount_point = Path(docker_cfg.mount_point)
        else:
            self.docker_client = None
            self.docker_image = None
            self.container_mount_point = None

    def execute(
        self,
        exe_name: str,
        command: str,
        kwargs: dict = None,
        local_path: Path = None,
        silent=False,
        cwd: str = None,
    ) -> Tuple[int, str]:
        """Execute a command in a docker container if an image is available, otherwise
        execute with the local executable.

        Since ``execute`` selects the correct executable, based on the availability of
        a docker image, you always need to pass an ``exe`` placeholder in the
        ``command`` string. The ``exe`` will be substituted internally by ``execute``.
        For instance ``"{exe} --version"``, to get the version of an executable.

        If the command needs access to a path, the second placeholder that must be in
        the ``command`` string is ``local_path``.
        The ``local_path`` in the command string will be substituted with the
        ``local_path`` parameter value, if the executable is local.
        If the command runs in docker, the ``local_path`` in the command string takes
        the value of the ``container_path``.

        The ``container_path`` is computed from the ``local_path`` parameter and the
        ``mount_point`` configuration value, such that
        ``container_path = mount_point / local_path.name``.

        Args:
            exe_name: Name of the executable to run (as defined in the resource config
                { "exes": { <name>: ...} }). This is because an AppImage can use
                multiple executables, and you need to select the one that needs to be
                run.
            command: The command to be executed. It is formatted with the ``kwargs``
                and the executable. Therefore, it must contain a placeholder for
                ``exe``, which is substitued from ``self.exe``.
            kwargs: Keyword arguments to pass as command parameters.
            local_path: If ``local_path`` is a directory, then it is mounted on the
                ``mount_point`` directly. If ``local_path`` is a file, then it is
                mounted on the ``mount_point`` as
                ``local_path : mount_point/local_path.name``.
            silent: If False, send execution messages to the logger, else do not log.

        Returns:
             The STDOUT and return code from the command execution.

        Examples:

            .. code-block:: python

                # Pass the name of the exe first, then the command, including the
                # 'exe' placeholder.
                self.execute("ogrinfo", "{exe} --version")

                self.execute("ogrinfo", "{exe} -so -al {local_path}",
                             local_path=Path("/tmp/myfile.gml"))
        """
        if kwargs:
            if "exe" in kwargs:
                raise ValueError(
                    "Cannot include 'exe' in the kwargs. Pass the exe in "
                    "the 'exe_name' parameter."
                )
            if "local_path" in kwargs:
                raise ValueError(
                    "Cannot include 'local_path' in the kwargs. Pass the "
                    "path in the 'local path' parameter."
                )
        kwargs_with_exe = deepcopy(kwargs) if kwargs else dict()
        kwargs_with_exe["exe"] = self.exes[exe_name]
        if self.with_docker:
            if local_path:
                if local_path.is_dir():
                    container_path = self.container_mount_point
                else:
                    container_path = self.container_mount_point / local_path.name
                kwargs_with_exe["local_path"] = container_path
                volumes = [
                    f"{local_path}:{container_path}",
                ]
            else:
                volumes = None
            output = self._docker_run(
                command.format(**kwargs_with_exe), volumes=volumes
            )
            return_code = 1 if "error" in output.lower() else 0
        else:
            kwargs_with_exe["local_path"] = local_path
            if silent:
                output, return_code = execute_shell_command_silent(
                    shell_command=command.format(**kwargs_with_exe), cwd=cwd
                )
            else:
                output, return_code = execute_shell_command(
                    shell_command=command.format(**kwargs_with_exe),
                    log=self.logger,
                    output_logging="STREAM",
                    cwd=cwd,
                )
        if return_code != 0:
            raise Failure(f"{kwargs_with_exe['exe']} failed with output:\n{output}")
        elif "error" in output.lower():
            self.logger.error(f"Error in subprocess output: {output}")
            return return_code, output
        else:
            return return_code, output

    def _docker_run(self, command, volumes=None, silent=False) -> str:
        """Executes a `command` with 'docker run'.

        The `host_path` is mounted at `mount_point` that is provided in the resource
            configuration.
        The `--network` is set to `host`.
        Removes the container when finished.

        Returns the STDOUT from the container.
        """
        if self.with_docker:
            if not silent:
                logger = get_dagster_logger()
                logger.info(f"Executing `{command}` in {self.docker_image.tags}")
            stdout = self.docker_client.containers.run(
                self.docker_image,
                command=command,
                volumes=volumes,
                network_mode="host",
                remove=True,
                detach=False,
                stdout=True,
                stderr=True,
            )
            return stdout.decode("utf-8")
        else:
            raise RuntimeError(
                "executable resource was not initialized with a " "docker image"
            )

    def version(self, exe: str):
        version, returncode = execute_shell_command_silent(f"{exe} --version")
        return format_version_stdout(version)

execute(exe_name, command, kwargs=None, local_path=None, silent=False, cwd=None)

Execute a command in a docker container if an image is available, otherwise execute with the local executable.

Since execute selects the correct executable, based on the availability of a docker image, you always need to pass an exe placeholder in the command string. The exe will be substituted internally by execute. For instance "{exe} --version", to get the version of an executable.

If the command needs access to a path, the second placeholder that must be in the command string is local_path. The local_path in the command string will be substituted with the local_path parameter value, if the executable is local. If the command runs in docker, the local_path in the command string takes the value of the container_path.

The container_path is computed from the local_path parameter and the mount_point configuration value, such that container_path = mount_point / local_path.name.

Parameters:

Name Type Description Default
exe_name str

Name of the executable to run (as defined in the resource config { "exes": { : ...} }). This is because an AppImage can use multiple executables, and you need to select the one that needs to be run.

required
command str

The command to be executed. It is formatted with the kwargs and the executable. Therefore, it must contain a placeholder for exe, which is substitued from self.exe.

required
kwargs dict

Keyword arguments to pass as command parameters.

None
local_path Path

If local_path is a directory, then it is mounted on the mount_point directly. If local_path is a file, then it is mounted on the mount_point as local_path : mount_point/local_path.name.

None
silent

If False, send execution messages to the logger, else do not log.

False

Returns:

Type Description
Tuple[int, str]

The STDOUT and return code from the command execution.

Examples:

.. code-block:: python

    # Pass the name of the exe first, then the command, including the
    # 'exe' placeholder.
    self.execute("ogrinfo", "{exe} --version")

    self.execute("ogrinfo", "{exe} -so -al {local_path}",
                 local_path=Path("/tmp/myfile.gml"))
Source code in bag3d/common/resources/executables.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def execute(
    self,
    exe_name: str,
    command: str,
    kwargs: dict = None,
    local_path: Path = None,
    silent=False,
    cwd: str = None,
) -> Tuple[int, str]:
    """Execute a command in a docker container if an image is available, otherwise
    execute with the local executable.

    Since ``execute`` selects the correct executable, based on the availability of
    a docker image, you always need to pass an ``exe`` placeholder in the
    ``command`` string. The ``exe`` will be substituted internally by ``execute``.
    For instance ``"{exe} --version"``, to get the version of an executable.

    If the command needs access to a path, the second placeholder that must be in
    the ``command`` string is ``local_path``.
    The ``local_path`` in the command string will be substituted with the
    ``local_path`` parameter value, if the executable is local.
    If the command runs in docker, the ``local_path`` in the command string takes
    the value of the ``container_path``.

    The ``container_path`` is computed from the ``local_path`` parameter and the
    ``mount_point`` configuration value, such that
    ``container_path = mount_point / local_path.name``.

    Args:
        exe_name: Name of the executable to run (as defined in the resource config
            { "exes": { <name>: ...} }). This is because an AppImage can use
            multiple executables, and you need to select the one that needs to be
            run.
        command: The command to be executed. It is formatted with the ``kwargs``
            and the executable. Therefore, it must contain a placeholder for
            ``exe``, which is substitued from ``self.exe``.
        kwargs: Keyword arguments to pass as command parameters.
        local_path: If ``local_path`` is a directory, then it is mounted on the
            ``mount_point`` directly. If ``local_path`` is a file, then it is
            mounted on the ``mount_point`` as
            ``local_path : mount_point/local_path.name``.
        silent: If False, send execution messages to the logger, else do not log.

    Returns:
         The STDOUT and return code from the command execution.

    Examples:

        .. code-block:: python

            # Pass the name of the exe first, then the command, including the
            # 'exe' placeholder.
            self.execute("ogrinfo", "{exe} --version")

            self.execute("ogrinfo", "{exe} -so -al {local_path}",
                         local_path=Path("/tmp/myfile.gml"))
    """
    if kwargs:
        if "exe" in kwargs:
            raise ValueError(
                "Cannot include 'exe' in the kwargs. Pass the exe in "
                "the 'exe_name' parameter."
            )
        if "local_path" in kwargs:
            raise ValueError(
                "Cannot include 'local_path' in the kwargs. Pass the "
                "path in the 'local path' parameter."
            )
    kwargs_with_exe = deepcopy(kwargs) if kwargs else dict()
    kwargs_with_exe["exe"] = self.exes[exe_name]
    if self.with_docker:
        if local_path:
            if local_path.is_dir():
                container_path = self.container_mount_point
            else:
                container_path = self.container_mount_point / local_path.name
            kwargs_with_exe["local_path"] = container_path
            volumes = [
                f"{local_path}:{container_path}",
            ]
        else:
            volumes = None
        output = self._docker_run(
            command.format(**kwargs_with_exe), volumes=volumes
        )
        return_code = 1 if "error" in output.lower() else 0
    else:
        kwargs_with_exe["local_path"] = local_path
        if silent:
            output, return_code = execute_shell_command_silent(
                shell_command=command.format(**kwargs_with_exe), cwd=cwd
            )
        else:
            output, return_code = execute_shell_command(
                shell_command=command.format(**kwargs_with_exe),
                log=self.logger,
                output_logging="STREAM",
                cwd=cwd,
            )
    if return_code != 0:
        raise Failure(f"{kwargs_with_exe['exe']} failed with output:\n{output}")
    elif "error" in output.lower():
        self.logger.error(f"Error in subprocess output: {output}")
        return return_code, output
    else:
        return return_code, output

GDALResource

Bases: ConfigurableResource

A GDAL Resource can be configured by either the local EXE paths for ogr2ogr, ogrinfo and sozip, or by providing the DockerConfig for the GDAL image.

For the local exes you can use:

gdal_resource = GDALResource(exe_ogr2ogr=os.getenv("EXE_PATH_OGR2OGR"),
                             exe_ogrinfo=os.getenv("EXE_PATH_OGRINFO"),
                             exe_sozip=os.getenv("EXE_PATH_SOZIP"))

For the docker image you can use:

gdal_local = GDALResource(docker_cfg=DockerConfig(
                        image=DOCKER_GDAL_IMAGE,
                        mount_point="/tmp"))

If instantiated with GDALResource() then the Docker image is used by default. After the resource has been instantiated, gdal (AppImage) can be acquired with the app property:

gdal_resource.app
Source code in bag3d/common/resources/executables.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class GDALResource(ConfigurableResource):
    """
    A GDAL Resource can be configured by either the local EXE paths
    for `ogr2ogr`, `ogrinfo` and `sozip`, or by providing the DockerConfig
    for the GDAL image.

    For the local exes you can use:

        gdal_resource = GDALResource(exe_ogr2ogr=os.getenv("EXE_PATH_OGR2OGR"),
                                     exe_ogrinfo=os.getenv("EXE_PATH_OGRINFO"),
                                     exe_sozip=os.getenv("EXE_PATH_SOZIP"))

    For the docker image you can use:

        gdal_local = GDALResource(docker_cfg=DockerConfig(
                                image=DOCKER_GDAL_IMAGE,
                                mount_point="/tmp"))

    If instantiated with GDALResource() then the Docker image is used by
    default. After the resource has been instantiated, gdal (AppImage) can
    be acquired with the `app` property:

        gdal_resource.app
    """

    exe_ogrinfo: str
    exe_ogr2ogr: str
    exe_sozip: str
    docker_cfg: DockerConfig

    def __init__(
        self,
        exe_ogrinfo: Optional[str] = None,
        exe_ogr2ogr: Optional[str] = None,
        exe_sozip: Optional[str] = None,
        docker_cfg: Optional[DockerConfig] = None,
    ):
        super().__init__(
            exe_ogrinfo=exe_ogrinfo or "ogrinfo",
            exe_ogr2ogr=exe_ogr2ogr or "ogr2ogr",
            exe_sozip=exe_sozip or "sozip",
            docker_cfg=docker_cfg
            or DockerConfig(image=DOCKER_GDAL_IMAGE, mount_point="/tmp"),
        )

    @property
    def exes(self) -> Dict[str, str]:
        return {
            "ogrinfo": self.exe_ogrinfo,
            "ogr2ogr": self.exe_ogr2ogr,
            "sozip": self.exe_sozip,
        }

    @property
    def with_docker(self) -> bool:
        if (
            self.exe_ogrinfo == "ogrinfo"
            and self.exe_ogr2ogr == "ogr2ogr"
            and self.exe_sozip == "sozip"
        ):
            return True
        else:
            return False

    @property
    def app(self) -> AppImage:
        return AppImage(
            exes=self.exes, docker_cfg=self.docker_cfg, with_docker=self.with_docker
        )

GeoflowResource

Bases: ConfigurableResource

A GeoflowResource can be configured by providing the paths to Geoflow exe_geoflow executable on the local system and the path to the reconstruction flowchart.

Example:

geoflow_resource = GeoflowResource(exe_geoflow = os.getenv("EXE_PATH_ROOFER_RECONSTRUCT"),
                                   flowchart=os.getenv("FLOWCHART_PATH_RECONSTRUCT"))

After the resource has been instantiated, geoflow (AppImage) can be acquired with the app property:

geoflow = geoflow_resource.app
Source code in bag3d/common/resources/executables.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class GeoflowResource(ConfigurableResource):
    """
    A GeoflowResource can be configured by providing the paths to
    Geoflow `exe_geoflow` executable on the local system
    and the path to the reconstruction flowchart.

    Example:

        geoflow_resource = GeoflowResource(exe_geoflow = os.getenv("EXE_PATH_ROOFER_RECONSTRUCT"),
                                           flowchart=os.getenv("FLOWCHART_PATH_RECONSTRUCT"))

    After the resource has been instantiated, geoflow (AppImage) can
    be acquired with the `app` property:

        geoflow = geoflow_resource.app
    """

    exe_geoflow: str
    flowchart: str

    @property
    def exes(self) -> Dict[str, str]:
        return {"geof": self.exe_geoflow}

    @property
    def with_docker(self) -> bool:
        return False

    @property
    def app(self) -> AppImage:
        return AppImage(
            exes=self.exes,
            with_docker=self.with_docker,
            kwargs={"flowcharts": {"reconstruct": self.flowchart}},
        )

LASToolsResource

Bases: ConfigurableResource

A LASTools Resource can be configured by providing the paths to LASTools executables "lasindex" and "las2las" on the local system.

Example:

lastools_resource = LASToolsResource(exe_lasindex=os.getenv("EXE_PATH_LASINDEX"),
                                     exe_las2las=s.getenv("EXE_PATH_LAS2LAS"))

After the resource has been instantiated, lastools (AppImage) can be acquired with the app property:

lastools_resource.app
Source code in bag3d/common/resources/executables.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
class LASToolsResource(ConfigurableResource):
    """
    A LASTools Resource can be configured by providing the paths to
    LASTools executables "lasindex" and "las2las" on the local system.

    Example:

        lastools_resource = LASToolsResource(exe_lasindex=os.getenv("EXE_PATH_LASINDEX"),
                                             exe_las2las=s.getenv("EXE_PATH_LAS2LAS"))

    After the resource has been instantiated, lastools (AppImage) can
    be acquired with the `app` property:

        lastools_resource.app
    """

    exe_lasindex: str
    exe_las2las: str

    @property
    def exes(self) -> Dict[str, str]:
        return {"lasindex": self.exe_lasindex, "las2las": self.exe_las2las}

    @property
    def with_docker(self) -> bool:
        return False

    @property
    def app(self) -> AppImage:
        return AppImage(exes=self.exes, with_docker=self.with_docker)

PDALResource

Bases: ConfigurableResource

A PDAL Resource can be configured by either the local EXE path for pdal or by providing the DockerConfig for the PDAL image.

For the local exe you can use:

pdal_resource = PDALResource(exe_pdal=os.getenv("EXE_PATH_PDAL"))

For the docker image you can use:

pdal_resource = PDALResource(docker_cfg=DockerConfig(
                                image=DOCKER_PDAL_IMAGE,
                                mount_point="/tmp"))

If instantiated with PDALResource() then the Docker image is used by default. After the resource has been instantiated, pdal (AppImage) can be acquired with the app property:

pdal_resource.app
Source code in bag3d/common/resources/executables.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class PDALResource(ConfigurableResource):
    """
    A PDAL Resource can be configured by either the local EXE path
    for `pdal` or by providing the DockerConfig for the PDAL image.

    For the local exe you can use:

        pdal_resource = PDALResource(exe_pdal=os.getenv("EXE_PATH_PDAL"))

    For the docker image you can use:

        pdal_resource = PDALResource(docker_cfg=DockerConfig(
                                        image=DOCKER_PDAL_IMAGE,
                                        mount_point="/tmp"))

    If instantiated with PDALResource() then the Docker image is used by
    default. After the resource has been instantiated, pdal (AppImage) can
    be acquired with the `app` property:

        pdal_resource.app
    """

    exe_pdal: str
    docker_cfg: DockerConfig

    def __init__(
        self,
        exe_pdal: Optional[str] = None,
        docker_cfg: Optional[DockerConfig] = None,
    ):
        super().__init__(
            exe_pdal=exe_pdal or "pdal",
            docker_cfg=docker_cfg
            or DockerConfig(image=DOCKER_PDAL_IMAGE, mount_point="/tmp"),
        )

    @property
    def exes(self) -> Dict[str, str]:
        return {
            "pdal": self.exe_pdal,
        }

    @property
    def with_docker(self) -> bool:
        if self.exe_pdal == "pdal":
            return True
        else:
            return False

    @property
    def app(self) -> AppImage:
        return AppImage(
            exes=self.exes, docker_cfg=self.docker_cfg, with_docker=self.with_docker
        )

RooferResource

Bases: ConfigurableResource

A RooferResource can be configured by providing the paths to Roofer crop executable on the local system.

Example:

roofer_resource = RooferResource(exe_roofer_crop=os.getenv("EXE_PATH_ROOFER_CROP"))

After the resource has been instantiated, roofer (AppImage) can be acquired with the app property:

roofer = roofer_resource.app
Source code in bag3d/common/resources/executables.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class RooferResource(ConfigurableResource):
    """
    A RooferResource can be configured by providing the paths to
    Roofer `crop` executable on the local system.

    Example:

        roofer_resource = RooferResource(exe_roofer_crop=os.getenv("EXE_PATH_ROOFER_CROP"))

    After the resource has been instantiated, roofer (AppImage) can
    be acquired with the `app` property:

        roofer = roofer_resource.app
    """

    exe_roofer_crop: str

    @property
    def exes(self) -> Dict[str, str]:
        return {"crop": self.exe_roofer_crop}

    @property
    def with_docker(self) -> bool:
        return False

    @property
    def app(self) -> AppImage:
        return AppImage(exes=self.exes, with_docker=self.with_docker)

TylerResource

Bases: ConfigurableResource

A Tyler Resource can be configured by providing the paths to Tyler executables "tyler" and "tyler-db" on the local system.

Example:

tyler_resource = TylerResource(exe_tyler=os.getenv("EXE_PATH_TYLER"),
                               exe_tyler_db=s.getenv("EXE_PATH_TYLER_DB"))

After the resource has been instantiated, tyler (AppImage) can be acquired with the app property:

tyler = tyler_resource.app
Source code in bag3d/common/resources/executables.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
class TylerResource(ConfigurableResource):
    """
    A Tyler Resource can be configured by providing the paths to
    Tyler executables "tyler" and "tyler-db" on the local system.

    Example:

        tyler_resource = TylerResource(exe_tyler=os.getenv("EXE_PATH_TYLER"),
                                       exe_tyler_db=s.getenv("EXE_PATH_TYLER_DB"))

    After the resource has been instantiated, tyler (AppImage) can
    be acquired with the `app` property:

        tyler = tyler_resource.app
    """

    exe_tyler: str
    exe_tyler_db: str

    @property
    def exes(self) -> Dict[str, str]:
        return {"tyler": self.exe_tyler, "tyler-db": self.exe_tyler_db}

    @property
    def with_docker(self) -> bool:
        return False

    @property
    def app(self) -> AppImage:
        return AppImage(exes=self.exes, with_docker=self.with_docker)

execute_shell_command_silent(shell_command, cwd=None, env=None)

Execute a shell command without sending the output to the logger, and without writing the command to a script file first.

NOTE: This function is based on the execute_script_file dagster_shell function, which can be found here: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-shell/dagster_shell/utils.py

Parameters:

Name Type Description Default
shell_command str

The shell script to execute.

required
cwd str

Working directory for the shell command to use.

None
env Dict[str, str]

Environment dictionary to pass to subprocess.Popen. Unused by default.

None

Returns:

Type Description

Tuple[str, int]: A tuple where the first element is the combined

stdout/stderr output of running the shell command and the second element is

the return code.

Source code in bag3d/common/resources/executables.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def execute_shell_command_silent(shell_command: str, cwd=None, env=None):
    """Execute a shell command without sending the output to the logger, and without
    writing the command to a script file first.

    NOTE: This function is based on the execute_script_file dagster_shell function,
    which can be found here: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-shell/dagster_shell/utils.py

    Args:
        shell_command (str): The shell script to execute.
        cwd (str, optional): Working directory for the shell command to use.
        env (Dict[str, str], optional): Environment dictionary to pass to ``subprocess.Popen``.
            Unused by default.

    Returns:
        Tuple[str, int]: A tuple where the first element is the combined
        stdout/stderr output of running the shell command and the second element is
        the return code.
    """

    def pre_exec():
        # Restore default signal disposition and invoke setsid
        for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
            if hasattr(signal, sig):
                signal.signal(getattr(signal, sig), signal.SIG_DFL)
        os.setsid()

    sub_process = None
    try:
        stdout_pipe = PIPE
        stderr_pipe = STDOUT

        sub_process = Popen(
            shell_command,
            shell=True,
            stdout=stdout_pipe,
            stderr=stderr_pipe,
            cwd=cwd,
            env=env,
            preexec_fn=pre_exec,
            encoding="UTF-8",
        )

        # Stream back logs as they are emitted
        lines = []
        for line in sub_process.stdout:
            lines.append(line)
        output = "".join(lines)

        sub_process.wait()

        return output, sub_process.returncode
    finally:
        # Always terminate subprocess, including in cases where the run is terminated
        if sub_process:
            sub_process.terminate()