I’ve written an Uplink module that backs up an App’s tables to a SQLite database file. It started out as a hard-coded script, but I’ve taken some steps to generalize it.
Some caveats:
- I haven’t tried it with multi-link columns. (My tables don’t have any.) To avoid trouble, the (incomplete) code to handle that is commented out. If you have such columns, you’ll want to uncomment those lines (and test thoroughly!). If you do, please let me know how it went!
- It reads one of the App’s .yaml files to get the database structure. So you need to have a (reasonably) current backup of your App’s source code, for it to read. Anvil reserves the right to change the format of these files, and schemas, at any time. It’s happened before. It’ll happen again. So, at that point, this code will probably break.
- To read the YAML, it depends on pyyaml. This is a third-party module. (
py -m pip install pyyaml
into your favorite Python virtual environment.) - Media objects are written to their own table.
- Users may very well be updating the database while a backup is running. In that case, you might get mixed results.
- There’s no “restore” module. Restoring just the right data, safely, is likely to be very, very App-dependent. Using Row IDs to tie it all together? Row IDs of deleted rows can’t be restored; on insert, Anvil will generate brand-new ones. (And how do you keep logged-in users’ database activity from accidentally interfering with the ongoing Restore?)
At least, with a backup, you can have something to restore, even if it’s not immediately clear how.
Usage
from pathlib import Path
from BackUpAnvilAppTables_work import do_backup
# connect to your App with full table-access privileges.
# That usually means using the Server (not Client) credentials.
do_backup(
schema_source_path = Path('...'), # e.g., anvil.yaml
schema_filename = Path('...'), # schema file will be written here
dest_folder = Path('...'), # schema+data file will be written to this folder
base_dest_name = '...', # schema+data file name. date+time will be added to name.
)
The Uplink Module (if there’s a better way to attach this, let me know)
# Module: BackUpAnvilAppTables_work
# Purpose: to back up an Anvil App's data (and schema) to a SQLite file.
# Requires: Python 3.7 or later; PyYaml
# This is the workhorse function, to run after the main program
# successfully identifies the application source file (.yaml) and logs in
# to Anvil.
from __future__ import annotations
# standard-lib modules:
import pathlib
import sqlite3 as dblib
import datetime
import json
import shutil
# third-party modules:
import yaml # i.e., py -m pip install pyyaml
# web-service imports
import anvil.server
from tables import app_tables
# Note: Reading a full-project .yaml file (mine is over 1MB) can take
# several seconds, yet is likely to result in exactly the same schema as
# before. We could speed this up by skipping to the line reading
# "exported_table_schema:" before attempting to parse anything.
#
# However, the newer anvil.yaml file, which is just one source file
# in the project, is much smaller, *and* more detailed (at the table
# level). Prefer to read the smaller file when available (and current).
#
# If no .yaml file is available, then one can get a list of a table's
# column names via app_tables.my_table.list_columns(), but this gives
# only the names, not the types. (There is no documented way to get
# a list of tables.) The resulting information is too sparse to be
# processed by the code below.
def load_schema(
schema_source_path: pathlib.Path, # full path to app-as-yaml-file
):
filename = schema_source_path.name
source_is_git = (filename == 'anvil.yaml')
schema_key = 'db_schema' if source_is_git else 'exported_table_schema'
with open(schema_source_path, encoding='utf-8') as schema_source:
project = yaml.safe_load(schema_source)
anvil_schema = project[schema_key]
del project
if source_is_git: # instead of a full-project backup file
# try to translate to something like the full-project format
for table_descriptor in anvil_schema:
a = table_descriptor['access']
for column_name in [ 'server', 'client', 'app_id',
'table_mapping_name', 'table_mapping_id'
]:
table_descriptor[column_name] = a[column_name]
# The other fields in table['access'] are duplicates.
# Just in case we missed something, convert it to a data
# type that SQLite can handle.
table_descriptor['access'] = json.dumps(a)
return anvil_schema
def do_backup(
schema_source_path: pathlib.Path, # full path to app-as-yaml-file
schema_filename: pathlib.Path, # name of schema-only sqlite file
dest_folder: pathlib.Path, # folder to contain full backup file
base_dest_name: str, # base name of backup file in folder
):
print('Loading Project File (to get the db schema)...')
anvil_schema = load_schema(schema_source_path)
print('Loaded', len(anvil_schema), 'table definitions.')
# create a lookup table (numeric table id -> python name) for later use:
table_name_from_id = {
table['id']: table['python_name']
for table in anvil_schema
}
print(table_name_from_id.values())
# identify the database file to contain the extracted schema:
schema_timestamp = datetime.datetime.now().isoformat()[:19]
schema_folder = pathlib.Path.cwd()
schema_full_name = schema_folder / schema_filename
# if there is already a file by this name, move it out of the way:
if schema_full_name.exists():
schema_full_name.replace(schema_full_name.with_suffix('.sqlite.bak'))
# create the database file:
db = dblib.connect(schema_full_name)
cursor = db.cursor()
# record basic information about the schema:
cursor.execute('create table _provenance( key text, value text )')
cursor.executemany(
'insert into _provenance(key, value) values(?,?)',
(
('schema_created_at', schema_timestamp),
('schema_source', str(schema_source_path)),
('source_schema_json', json.dumps(
anvil_schema, indent=2)),
)
)
db.commit()
# record the database schema. First, the tables:
schema_column_names = sorted(anvil_schema[0].keys())
schema_column_names.remove('columns')
create_table_stmt = (
'create table _anvil_tables(\n' +
' id integer primary key,\n' +
',\n'.join(
' ' + name + ' text'
for name in schema_column_names
if name != 'id'
) +
'\n)'
)
cursor.execute(create_table_stmt)
insert_stmt = (
'insert into _anvil_tables(' + ', '.join(schema_column_names) + ') '
'values(' + '?,' * (len(schema_column_names) - 1) + '?)')
for table in anvil_schema:
row_values = [table[name] for name in schema_column_names]
cursor.execute(insert_stmt, row_values)
db.commit()
# Then the table columns:
all_column_attributes = set()
for table in anvil_schema:
for column in table['columns'].values():
column_attributes = set(column.keys())
all_column_attributes |= column_attributes
all_column_attributes.remove('admin_ui')
create_table_stmt = (
'create table _anvil_columns('
'\n _anvil_table_id integer references _anvil_tables(id),'
'\n _anvil_column_id text,'
'\n table_name text references _anvil_tables(python_name),' +
',\n'.join(
' ' + name + ' text'
for name in all_column_attributes
if name not in { 'table_id' }
) +
',\n table_id integer references _anvil_tables(id)'
'\n)'
)
cursor.execute(create_table_stmt)
for table in anvil_schema:
for anvil_column_id, column_def in table['columns'].items():
column_names = ['_anvil_table_id', '_anvil_column_id', 'table_name']
placeholders = ['?', '?', '?']
column_values = [table['id'], anvil_column_id, table['python_name']]
for name, value in column_def.items():
if name in all_column_attributes:
column_names.append(name)
column_values.append(value)
placeholders.append('?')
insert_stmt = (
'insert into _anvil_columns'
'(' + ','.join(column_names) + ')\n'
'values(' + ','.join(placeholders) + ')'
)
cursor.execute(insert_stmt, column_values)
db.commit()
def sql_quote(identifier):
return '"' + identifier + '"'
# Provide a table where Media values can be stored.
# Columns needed: content_type (text), length (int), name (text),
# url (text), bytes (blob). Also needs a row id of some kind, for reference
# from the "media" column. De-duplication would be nice...
create_table_stmt = (
'create table _anvil_media_objects (\n' +
# rowid integer primary key, IMPLICIT with SQLite
# ' object_type text,\n' # BlobMedia or URLMedia -- can't tell!
' content_type text,\n'
' name text,\n'
' url text,\n'
' length integer,\n'
' bytes blob\n'
# For deduplication, we would add a hash code, SHA-3 or better.
# If all other fields match, and the hash code matches, then it's
# the same object, for all practical purposes.
'\n)'
)
cursor.execute(create_table_stmt)
# define the actual tables:
for table in anvil_schema:
python_table_name = sql_quote(table["python_name"])
create_table_stmt = (
f'create table {python_table_name} (\n'
' _anvil_row_id text primary key not null'
)
for anvil_column_id, column_def in table['columns'].items():
column_type = column_def['type']
reference_target = reference_clause = ''
if column_type.startswith('liveObject'):
reference_target = sql_quote(
table_name_from_id[column_def['table_id']])
if reference_target:
reference_clause = f' references {reference_target}(_anvil_row_id)'
column_name = sql_quote(column_def['name'])
create_table_stmt += (
f',\n {column_name} {column_type} {reference_clause}'
)
create_table_stmt += '\n)'
cursor.execute(create_table_stmt)
db.commit()
db.close()
print('Boilerplate SQLite file written to:', schema_full_name)
# identify the backup file:
bkup_start = datetime.datetime.now()
bkup_timestamp = bkup_start.isoformat()[:19]
print('Starting backup:', bkup_timestamp)
stamp_for_filename = bkup_timestamp.replace('-', '').replace(':', '')
dest_filename = base_dest_name + '.' + stamp_for_filename + '.sqlite'
dest_name = dest_folder / dest_filename
print('Backing up to:', dest_name)
# copy the boilerplate file into the backup file
shutil.copyfile(schema_full_name, dest_name)
# open the backup file:
db = dblib.connect(dest_name)
cursor = db.cursor()
# note when the backup started (to the nearest second):
cursor.execute(
'insert into _provenance values(?,?)',
('backup_start', bkup_timestamp)
)
db.commit()
media_cursor = db.cursor()
media_insert_stmt = (
'insert into _anvil_media_objects\n'
'(content_type, name, url, length, bytes)\n'
'values(?, ?, ?, ?, ?)\n'
)
def add_media_object(obj):
media_cursor.execute(
media_insert_stmt,
(
# obj.__name__,
obj.content_type,
obj.name,
obj.url,
obj.length,
obj.get_bytes(),
)
)
return media_cursor.lastrowid
# copy the tables, one by one:
for table in anvil_schema:
python_table_name = table["python_name"]
app_tbl = getattr(app_tables, python_table_name, None) #.client_readable()
# PEC 2020-01-12: I'd hoped that read-only access would be faster. Nope.
python_table_name = sql_quote(python_table_name)
if app_tbl is None:
print('Anvil table', python_table_name, 'not found')
continue
print('Copying', python_table_name, end='')
for row in app_tbl.search():
column_names = ['_anvil_row_id']
placeholders = ['?']
column_values = [anvil._get_live_object_id(row)] # slower: [row.get_id()]
for name, value in dict(row).items():
column_names.append(sql_quote(name))
# print(name,type(value))
if isinstance(value, dict) or isinstance(value, list):
value = json.dumps(value)
# elif isinstance(value, anvil._server.LiveObjectArray):
# obj_ids = [ obj.get_id for obj in value]
# value = json.dumps(obj_ids)
elif isinstance(value, anvil._server.LiveObjectProxy):
value = anvil._get_live_object_id(value) # slower: value.get_id()
elif isinstance(value, anvil.Media):
row_id = add_media_object(value)
value = row_id
column_values.append(value)
placeholders.append('?')
insert_stmt = (
f'insert into {python_table_name}\n'
'(' + ','.join(column_names) + ')\n'
'values(' + ','.join(placeholders) + ')'
)
cursor.execute(insert_stmt, column_values)
print('.', end='', flush=True)
db.commit()
print(' copied.')
# note when the backup finished (to the nearest second):
bkup_end = datetime.datetime.now()
bkup_timestamp = bkup_end.isoformat()[:19]
print('Finishing backup:', bkup_timestamp)
cursor.executemany(
'insert into _provenance values(?,?)',
(
('backup_end', bkup_timestamp),
('backup_dur', (bkup_end - bkup_start).total_seconds() )
)
)
db.commit()
db.close()
No doubt this can be improved upon.
For browsing the resulting SQLite file, I suggest SQLite Studio
or your other favorite database GUI tool.