Skip to content

database

create_schema(context, new_schema)

CREATE SCHEMA IF NOT EXISTS new_schema

Source code in bag3d/common/utils/database.py
111
112
113
114
115
116
def create_schema(context, new_schema):
    """CREATE SCHEMA IF NOT EXISTS new_schema"""
    conn = context.resources.db_connection.connect
    q = SQL("CREATE SCHEMA IF NOT EXISTS {sch};").format(sch=Identifier(new_schema))
    context.log.info(conn.print_query(q))
    conn.send_query(q)

drop_table(context, new_table)

DROP TABLE IF EXISTS new_table CASCADE

Source code in bag3d/common/utils/database.py
103
104
105
106
107
108
def drop_table(context, new_table):
    """DROP TABLE IF EXISTS new_table CASCADE"""
    conn = context.resources.db_connection.connect
    q = SQL("DROP TABLE IF EXISTS {tbl} CASCADE;").format(tbl=new_table.id)
    context.log.info(conn.print_query(q))
    conn.send_query(q)

load_sql(filename=None, query_params=None)

Load SQL from a file and inject parameters if provided.

If providing query parametes, they need to be in a dict, where the keys are the parameter names.

The SQL script can contain parameters in the form of ${...}, which is understood by most database managers. This is handy for developing the SQL scripts in a database manager, without having to execute the pipeline. However, the python formatting only understands {...} placeholders, so the $ are removed from ${...} when the SQL is loaded from the file.

Parameters:

Name Type Description Default
filename str

SQL File to load (without the path) from the sql sub-package. If None, it will load the .sql file with the name equal to the caller function's name.

None
query_params dict

If provided, the templated SQL is formatted with the parameters.

None

For example:

.. code-block:: python

def my_func():
    load_sql() # loads my_func.sql from bag3d_pipeline.sql
Source code in bag3d/common/utils/database.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def load_sql(filename: str = None, query_params: dict = None):  # pragma: no cover
    """Load SQL from a file and inject parameters if provided.

    If providing query parametes, they need to be in a dict, where the keys are the
    parameter names.

    The SQL script can contain parameters in the form of ``${...}``, which is
    understood by most database managers. This is handy for developing the SQL scripts
    in a database manager, without having to execute the pipeline.
    However, the python formatting only understands ``{...}`` placeholders, so the
    ``$`` are removed from ``${...}`` when the SQL is loaded from the file.

    Args:
        filename (str): SQL File to load (without the path) from the ``sql``
            sub-package. If None, it will load the ``.sql`` file with the name equal to
            the caller function's name.
        query_params (dict): If provided, the templated SQL is formatted with the
            parameters.

    For example:

    .. code-block:: python

        def my_func():
            load_sql() # loads my_func.sql from bag3d_pipeline.sql

    """
    # Find the name of the main package. This should be bag3d.<package>, e.g. bag3d.core
    stk = inspect.stack()[1]
    mod = inspect.getmodule(stk[0])
    pkgs = mod.__package__.split(".")
    if pkgs[0] != "bag3d" and len(pkgs) < 2:
        raise RuntimeError(
            "Trying to load SQL files from a namspace that is not bag3d.<package>."
        )
    sqlfiles_module = ".".join([pkgs[0], pkgs[1], "sqlfiles"])
    # Get the name of the calling function
    _f = filename if filename is not None else f"{inspect.stack()[1].function}.sql"
    _sql = resources.files(sqlfiles_module).joinpath(_f).read_text()
    _pysql = _sql.replace("${", "{")
    return inject_parameters(_pysql, query_params)

table_exists(context, table)

CHECKS IF TABLE EXISTS

Source code in bag3d/common/utils/database.py
119
120
121
122
123
124
125
126
127
128
129
def table_exists(context, table) -> bool:
    """CHECKS IF TABLE EXISTS"""
    query = SQL("""SELECT EXISTS (
                   SELECT FROM 
                        pg_tables
                   WHERE 
                        schemaname = {schema} AND 
                        tablename  = {table}
                    );""").format(schema=table.schema.str, table=table.table.str)
    res = context.resources.db_connection.connect.get_dict(query)
    return res[0]["exists"]