Skip to content

database

create_schema(db_connection, new_schema, logger)

CREATE SCHEMA IF NOT EXISTS new_schema

Source code in packages/common/src/bag3d/common/utils/database.py
118
119
120
121
122
123
def create_schema(db_connection: DatabaseResource, new_schema: str, logger: Logger):
    """CREATE SCHEMA IF NOT EXISTS new_schema"""
    conn = db_connection.connection
    q = SQL("CREATE SCHEMA IF NOT EXISTS {sch};").format(sch=Identifier(new_schema))
    logger.info(conn.print_query(q))
    conn.send_query(q)

drop_table(db_connection, new_table, logger)

DROP TABLE IF EXISTS new_table CASCADE

Source code in packages/common/src/bag3d/common/utils/database.py
110
111
112
113
114
115
def drop_table(db_connection: DatabaseResource, new_table, logger: Logger):
    """DROP TABLE IF EXISTS new_table CASCADE"""
    conn = db_connection.connection
    q = SQL("DROP TABLE IF EXISTS {tbl} CASCADE;").format(tbl=new_table.id)
    logger.info(conn.print_query(q))
    conn.send_query(q)

load_sql(filename=None, query_params=None)

Load SQL from a file and inject parameters if provided.

If providing query parametes, they need to be in a dict, where the keys are the parameter names.

The SQL script can contain parameters in the form of ${...}, which is understood by most database managers. This is handy for developing the SQL scripts in a database manager, without having to execute the pipeline. However, the python formatting only understands {...} placeholders, so the $ are removed from ${...} when the SQL is loaded from the file.

Parameters:

Name Type Description Default
filename str

SQL File to load (without the path) from the sql sub-package. If None, it will load the .sql file with the name equal to the caller function's name.

None
query_params dict

If provided, the templated SQL is formatted with the parameters.

None

For example:

.. code-block:: python

def my_func():
    load_sql() # loads my_func.sql from bag3d_pipeline.sql
Source code in packages/common/src/bag3d/common/utils/database.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def load_sql(filename: str = None, query_params: dict = None):  # pragma: no cover
    """Load SQL from a file and inject parameters if provided.

    If providing query parametes, they need to be in a dict, where the keys are the
    parameter names.

    The SQL script can contain parameters in the form of ``${...}``, which is
    understood by most database managers. This is handy for developing the SQL scripts
    in a database manager, without having to execute the pipeline.
    However, the python formatting only understands ``{...}`` placeholders, so the
    ``$`` are removed from ``${...}`` when the SQL is loaded from the file.

    Args:
        filename (str): SQL File to load (without the path) from the ``sql``
            sub-package. If None, it will load the ``.sql`` file with the name equal to
            the caller function's name.
        query_params (dict): If provided, the templated SQL is formatted with the
            parameters.

    For example:

    .. code-block:: python

        def my_func():
            load_sql() # loads my_func.sql from bag3d_pipeline.sql

    """
    # Find the name of the main package. This should be bag3d.<package>, e.g. bag3d.core
    stk = inspect.stack()[1]
    mod = inspect.getmodule(stk[0])
    pkgs = mod.__package__.split(".")
    if pkgs[0] != "bag3d" and len(pkgs) < 2:
        raise RuntimeError(
            "Trying to load SQL files from a namspace that is not bag3d.<package>."
        )
    sqlfiles_module = ".".join([pkgs[0], pkgs[1], "sqlfiles"])
    # Get the name of the calling function
    _f = filename if filename is not None else f"{inspect.stack()[1].function}.sql"
    _sql = resources.files(sqlfiles_module).joinpath(_f).read_text()
    _pysql = _sql.replace("${", "{")
    return inject_parameters(_pysql, query_params)

table_exists(db_connection, table)

CHECKS IF TABLE EXISTS

Source code in packages/common/src/bag3d/common/utils/database.py
126
127
128
129
130
131
132
133
134
135
136
137
138
def table_exists(db_connection: DatabaseResource, table) -> bool:
    """CHECKS IF TABLE EXISTS"""
    query = SQL("""SELECT EXISTS (
                   SELECT FROM
                        pg_tables
                   WHERE
                        schemaname = {schema} AND
                        tablename  = {table}
                    );""").format(
        schema=Literal(table.schema.str), table=Literal(table.table.str)
    )
    res = db_connection.connection.get_dict(query)
    return res[0]["exists"]