Skip to content

requests

download_as_str(url, parameters=None)

Download a file as string in memory.

Returns:

Type Description
str

The downloaded package as string.

Source code in bag3d/common/utils/requests.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def download_as_str(url: str, parameters: Mapping = None) -> str:
    """Download a file as string in memory.

    Returns:
         The downloaded package as string.
    """
    resp = requests.get(url=url, params=parameters, verify=True)
    if resp.status_code == 200:
        return resp.text
    else:  # pragma: no cover
        raise ValueError(
            f"Failed to download JSON. HTTP Status {resp.status_code} "
            f"for {resp.url}"
        )

download_file(url, target_path, chunk_size=1024, parameters=None, verify=True)

Download a large file and save it to disk.

Parameters:

Name Type Description Default
url str

The URL of the file to be downloaded.

required
target_path Path

Path to the target file or directory. If save_path is a directory, then the target file name is the last part of the url.

required
chunk_size int

The chunk_size parameter passed to :py:func:`request.Response.iter_content. Defaults to 1024.

1024
parameters dict

Query parameters passed to :py:func:requests.get.

None
verify bool

Whether to verify SSL certificate.

True

Returns: The local Path to the downloaded file, or None on failure

Source code in bag3d/common/utils/requests.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def download_file(
    url: str,
    target_path: Path,
    chunk_size: int = 1024,
    parameters: Mapping = None,
    verify: bool = True,
) -> Union[Path, None]:
    """Download a large file and save it to disk.

    Args:
        url (str): The URL of the file to be downloaded.
        target_path (Path): Path to the target file or directory. If ``save_path`` is a
            directory, then the target file name is the last part of the ``url``.
        chunk_size (int): The ``chunk_size`` parameter passed to
            :py:func:`request.Response.iter_content. Defaults to ``1024``.
        parameters (dict): Query parameters passed to :py:func:`requests.get`.
        verify (bool): Whether to verify SSL certificate.
    Returns:
        The local Path to the downloaded file, or None on failure
    """
    logger = get_dagster_logger()
    if target_path.is_dir():
        local_filename = url.split("/")[-1]
        fpath = target_path / local_filename
    else:
        fpath = target_path
    logger.info(f"Downloading from {url} to {fpath}")
    session = requests.Session()  # https://stackoverflow.com/a/63417213

    try:
        r = session.get(url, params=parameters, stream=True, verify=verify)
        if r.ok:
            with fpath.open("wb") as fd:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    fd.write(chunk)
            return fpath
        else:  # pragma: no cover
            r.raise_for_status()
        r.close()
    except (
        requests.exceptions.BaseHTTPError,
        requests.exceptions.HTTPError,
    ) as e:  # pragma: no cover
        logger.exception(e)
        return None

Request an export and download link from the API.

Source code in bag3d/common/utils/requests.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def get_extract_download_link(url, featuretypes, data_format, geofilter) -> str:
    """Request an export and download link from the API."""
    logger = get_dagster_logger()
    request_json = {
        "featuretypes": featuretypes,
        "format": data_format,
    }
    if geofilter is not None:
        request_json["geofilter"] = geofilter
    r_post = requests.post(url, json=request_json)
    logger.info(f"Requesting extract: {r_post.url} with {request_json} ")
    if not r_post.status_code == requests.codes.accepted:  # pragma: no cover
        logger.error(r_post.text)
        r_post.raise_for_status()
    else:
        _u = urlparse(url)
        pdok_server = f"{_u.scheme}://{_u.hostname}/"
        url_status = urljoin(pdok_server, r_post.json()["_links"]["status"]["href"])
        url_download = None

        if requests.get(url_status, verify=True).status_code == requests.codes.ok:
            while (
                r_status := requests.get(url_status, verify=True)
            ).status_code == requests.codes.ok:
                sleep(15)
            if not r_status.status_code == requests.codes.created:  # pragma: no cover
                logger.error(r_status.text)
                r_status.raise_for_status()
            url_download = urljoin(
                pdok_server, r_status.json()["_links"]["download"]["href"]
            )
        elif (
            requests.get(url_status, verify=True).status_code == requests.codes.created
        ):
            r_status = requests.get(url_status, verify=True)
            url_download = urljoin(
                pdok_server, r_status.json()["_links"]["download"]["href"]
            )
        else:  # pragma: no cover
            _r = requests.get(url_status, verify=True)
            logger.error(_r.text)
            _r.raise_for_status()
        logger.info(f"Extract URL: {url_download}")
        return url_download

get_metadata(url_api)

Get metadata from a PDOK API.

:returns: {"timeliness": : [featuretype,...]}

Source code in bag3d/common/utils/requests.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_metadata(url_api: str):
    """Get metadata from a PDOK API.

    :returns: {"timeliness": <date>: [featuretype,...]}
    """
    r_meta = requests.get(url_api, verify=True)
    if not r_meta.status_code == requests.codes.ok:  # pragma: no cover
        r_meta.raise_for_status()
    meta = {"timeliness": {}}
    for layer in r_meta.json()["timeliness"]:
        if layer["datetimeTo"][-1] == "Z":
            dt = str(datetime.fromisoformat(layer["datetimeTo"][:-1]).date())
        else:  # pragma: no cover
            dt = str(datetime.fromisoformat(layer["datetimeTo"]).date())
        if dt in meta["timeliness"]:
            meta["timeliness"][dt].append(layer["featuretype"])
        else:
            meta["timeliness"][dt] = [
                layer["featuretype"],
            ]
    return meta