Source code for logpyle.upgrade_db
"""
Database Upgrade Functions
--------------------------------
.. autofunction:: upgrade_db
.. note::
Currently, upgrades all schema versions to version 3.
Upgrading from version <=1 is untested.
.. table:: Overview of known changes between schema versions
============== =========================== ==================================
Schema version Logpyle version Changes
============== =========================== ==================================
0 pre v1 (``pytools.log``) Initial version, no schema_version
yet
1 v1 -- v9 (``pytools.log``) Added ``warnings`` table
2 v10 -- 2023.1 Added ``warnings.rank`` column
3 2023.2 -- Added ``warnings.unixtime`` column
and ``logging`` table
============== =========================== ==================================
"""
import logging
import shutil
import sqlite3
from pickle import dumps
logger = logging.getLogger(__name__)
def upgrade_conn(conn: sqlite3.Connection) -> sqlite3.Connection:
from logpyle.runalyzer import is_gathered
tmp = conn.execute("select * from warnings").description
warning_columns = [col[0] for col in tmp]
# check if the provided connection has been gathered
gathered = is_gathered(conn)
# ensure that warnings table has unixtime column
if "unixtime" not in warning_columns:
logger.info("Adding a unixtime column in the warnings table")
conn.execute("""
ALTER TABLE warnings
ADD unixtime integer DEFAULT NULL;
""")
# ensure that warnings table has rank column
# nowhere to grab the rank of the process that generated
# the warning
if "rank" not in warning_columns:
logger.info("Adding a rank column in the warnings table")
conn.execute("""
ALTER TABLE warnings
ADD rank integer DEFAULT NULL;
""")
tables = [col[0] for col in conn.execute("""
SELECT name
FROM sqlite_master
WHERE type='table'
""")]
logger.info("Ensuring a logging table exists")
if "logging" not in tables:
conn.execute("""
CREATE TABLE logging (
rank integer,
step integer,
unixtime integer,
level text,
message text,
filename text,
lineno integer
)""")
if gathered:
conn.execute("""
ALTER TABLE logging
ADD run_id integer;
""")
schema_version = 3
value = bytes(dumps(schema_version))
if gathered:
conn.execute("UPDATE runs SET schema_version=?",
(schema_version,))
else:
conn.execute("UPDATE constants SET value=? WHERE name='schema_version'",
(value,))
return conn
[docs]
def upgrade_db(
dbfile: str, suffix: str, overwrite: bool
) -> None:
"""
Upgrade a database file to the most recent format. If the
`overwrite` parameter is True, it simply modifies the existing
database and uses the same file name for the upgraded database.
Otherwise, a new database is created with a separate filename
by appending the given suffix to the original file's base name
using `filename + suffix + "." + file_ext`.
Parameters
----------
dbfile
A database file path
suffix
a suffix to be appended to the filename for the
upgraded database
overwrite
a boolean value indicating
whether to overwrite the original database or not
"""
# original db files
old_conn = sqlite3.connect(dbfile)
if overwrite:
# simply perform modifications on old connection
new_conn_name = dbfile
new_conn = old_conn
logger.info(f"Overwriting Database: {new_conn_name}")
else:
# separate the filename and the extension
filename, file_ext = dbfile.rsplit(".", 1)
new_conn_name = filename + suffix + "." + file_ext
shutil.copy(dbfile, new_conn_name)
new_conn = sqlite3.connect(new_conn_name)
logger.info(f"Creating new Database: {new_conn_name}, a clone of {dbfile}")
logger.info(f"Upgrading {new_conn_name} to schema version 3")
new_conn = upgrade_conn(new_conn)
if old_conn != new_conn:
old_conn.close()
new_conn.commit()
new_conn.close()