Here’s my updated version of this to work with the modern (as of January 2022) anvil.yaml file format. I only use this with a Git version of the file, not with an exported one.
from __future__ import annotations
# standard-lib modules:
import pathlib
import sqlite3 as dblib
import datetime
import json
import shutil
# third-party modules:
import yaml # i.e., py -m pip install pyyaml
# web-service imports
import anvil.server
from tables import app_tables
# Note: Reading a full-project .yaml file (mine is over 1MB) can take
# several seconds, yet is likely to result in exactly the same schema as
# before. We could speed this up by skipping to the line reading
# "exported_table_schema:" before attempting to parse anything.
#
# However, the newer anvil.yaml file, which is just one source file
# in the project, is much smaller, *and* more detailed (at the table
# level). Prefer to read the smaller file when available (and current).
#
# If no .yaml file is available, then one can get a list of a table's
# column names via app_tables.my_table.list_columns(), but this gives
# only the names, not the types. (There is no documented way to get
# a list of tables.) The resulting information is too sparse to be
# processed by the code below.
def load_schema(
schema_source_path: pathlib.Path, # full path to app-as-yaml-file
):
filename = schema_source_path.name
source_is_git = (filename == 'anvil.yaml')
schema_key = 'db_schema' if source_is_git else 'exported_table_schema'
with open(schema_source_path, encoding='utf-8') as schema_source:
project = yaml.safe_load(schema_source)
anvil_schema = project[schema_key]
del project
if source_is_git: # instead of a full-project backup file
# try to translate to something like the full-project format
schema = []
for table_name in anvil_schema:
table_descriptor = anvil_schema[table_name]
table_descriptor['python_name'] = table_name
schema.append(table_descriptor)
return schema
return anvil_schema
def do_backup(
schema_source_path: pathlib.Path, # full path to app-as-yaml-file
schema_filename: pathlib.Path, # name of schema-only sqlite file
dest_folder: pathlib.Path, # folder to contain full backup file
base_dest_name: str, # base name of backup file in folder
):
print('Loading Project File (to get the db schema)...')
anvil_schema = load_schema(schema_source_path)
print('Loaded', len(anvil_schema), 'table definitions.')
# identify the database file to contain the extracted schema:
schema_timestamp = datetime.datetime.now().isoformat()[:19]
schema_folder = pathlib.Path.cwd()
schema_full_name = schema_folder / schema_filename
# if there is already a file by this name, move it out of the way:
if schema_full_name.exists():
schema_full_name.replace(schema_full_name.with_suffix('.sqlite.bak'))
# create the database file:
db = dblib.connect(schema_full_name)
cursor = db.cursor()
# record basic information about the schema:
cursor.execute('create table _provenance( key text, value text )')
cursor.executemany(
'insert into _provenance(key, value) values(?,?)',
(
('schema_created_at', schema_timestamp),
('schema_source', str(schema_source_path)),
('source_schema_json', json.dumps(
anvil_schema, indent=2)),
)
)
db.commit()
# record the database schema. First, the tables:
schema_column_names = sorted(anvil_schema[0].keys())
schema_column_names.remove('columns')
create_table_stmt = (
'create table _anvil_tables(\n' +
' id integer primary key,\n' +
',\n'.join(
' ' + name + ' text'
for name in schema_column_names
if name != 'id'
) +
'\n)'
)
cursor.execute(create_table_stmt)
table_name_to_id = {}
insert_stmt = (
'insert into _anvil_tables(' + ', '.join(schema_column_names) + ') '
'values(' + '?,' * (len(schema_column_names) - 1) + '?)')
for table in anvil_schema:
row_values = [table[name] for name in schema_column_names]
cursor.execute(insert_stmt, row_values)
last_id = cursor.execute('SELECT last_insert_rowid()').fetchone()[0]
table_name_to_id[table['python_name']] = last_id
db.commit()
# Then the table columns:
all_column_attributes = set()
for table in anvil_schema:
for column in table['columns']:
column_attributes = set(column.keys())
all_column_attributes |= column_attributes
all_column_attributes.remove('admin_ui')
create_table_stmt = (
'create table _anvil_columns('
'\n _anvil_table_id integer references _anvil_tables(id),'
'\n table_name text references _anvil_tables(python_name),' +
',\n'.join(
' ' + name + ' text'
for name in all_column_attributes
if name not in { 'table_id' }
) +
',\n table_id integer references _anvil_tables(id)'
'\n)'
)
cursor.execute(create_table_stmt)
for table in anvil_schema:
for column in table['columns']:
column_names = ['table_name', '_anvil_table_id']
placeholders = ['?', '?']
column_values = [table['python_name'], table_name_to_id[table['python_name']]]
if 'target' in column:
column_names.append('table_id')
placeholders.append('?')
column_values.append(table_name_to_id[column['target']])
for name, value in column.items():
if name in all_column_attributes:
column_names.append(name)
column_values.append(value)
placeholders.append('?')
insert_stmt = (
'insert into _anvil_columns'
'(' + ','.join(column_names) + ')\n'
'values(' + ','.join(placeholders) + ')'
)
cursor.execute(insert_stmt, column_values)
db.commit()
def sql_quote(identifier):
return '"' + identifier + '"'
# Provide a table where Media values can be stored.
# Columns needed: content_type (text), length (int), name (text),
# url (text), bytes (blob). Also needs a row id of some kind, for reference
# from the "media" column. De-duplication would be nice...
create_table_stmt = (
'create table _anvil_media_objects (\n' +
# rowid integer primary key, IMPLICIT with SQLite
# ' object_type text,\n' # BlobMedia or URLMedia -- can't tell!
' content_type text,\n'
' name text,\n'
' url text,\n'
' length integer,\n'
' bytes blob\n'
# For deduplication, we would add a hash code, SHA-3 or better.
# If all other fields match, and the hash code matches, then it's
# the same object, for all practical purposes.
'\n)'
)
cursor.execute(create_table_stmt)
# define the actual tables:
for table in anvil_schema:
python_table_name = sql_quote(table["python_name"])
create_table_stmt = (
f'create table {python_table_name} (\n'
' _anvil_row_id text primary key not null'
)
for column in table['columns']:
column_type = column['type']
reference_target = reference_clause = ''
if column_type.startswith('link_'):
reference_target = column['target']
if reference_target:
reference_clause = f' references {reference_target}(_anvil_row_id)'
column_name = sql_quote(column['name'])
create_table_stmt += (
f',\n {column_name} {column_type} {reference_clause}'
)
create_table_stmt += '\n)'
cursor.execute(create_table_stmt)
db.commit()
db.close()
print('Boilerplate SQLite file written to:', schema_full_name)
# identify the backup file:
bkup_start = datetime.datetime.now()
bkup_timestamp = bkup_start.isoformat()[:19]
print('Starting backup:', bkup_timestamp)
stamp_for_filename = bkup_timestamp.replace('-', '').replace(':', '')
dest_filename = base_dest_name + '.' + stamp_for_filename + '.sqlite'
dest_name = dest_folder / dest_filename
print('Backing up to:', dest_name)
# copy the boilerplate file into the backup file
shutil.copyfile(schema_full_name, dest_name)
# open the backup file:
db = dblib.connect(dest_name)
cursor = db.cursor()
# note when the backup started (to the nearest second):
cursor.execute(
'insert into _provenance values(?,?)',
('backup_start', bkup_timestamp)
)
db.commit()
media_cursor = db.cursor()
media_insert_stmt = (
'insert into _anvil_media_objects\n'
'(content_type, name, url, length, bytes)\n'
'values(?, ?, ?, ?, ?)\n'
)
def add_media_object(obj):
media_cursor.execute(
media_insert_stmt,
(
# obj.__name__,
obj.content_type,
obj.name,
obj.url,
obj.length,
obj.get_bytes(),
)
)
return media_cursor.lastrowid
# copy the tables, one by one:
for table in anvil_schema:
python_table_name = table["python_name"]
app_tbl = getattr(app_tables, python_table_name, None) #.client_readable()
# PEC 2020-01-12: I'd hoped that read-only access would be faster. Nope.
python_table_name = sql_quote(python_table_name)
if app_tbl is None:
print('Anvil table', python_table_name, 'not found')
continue
print('Copying', python_table_name, end='')
for row in app_tbl.search():
column_names = ['_anvil_row_id']
placeholders = ['?']
column_values = [anvil._get_live_object_id(row)] # slower: [row.get_id()]
for name, value in dict(row).items():
column_names.append(sql_quote(name))
# print(name,type(value))
if isinstance(value, dict):
value = json.dumps(value)
elif isinstance(value, list):
try:
value = json.dumps(value)
except:
# Probably a multi-link column
obj_ids = [ obj.get_id() for obj in value]
value = json.dumps(obj_ids)
elif isinstance(value, anvil._server.LiveObjectProxy):
value = anvil._get_live_object_id(value) # slower: value.get_id()
elif isinstance(value, anvil.Media):
row_id = add_media_object(value)
value = row_id
column_values.append(value)
placeholders.append('?')
insert_stmt = (
f'insert into {python_table_name}\n'
'(' + ','.join(column_names) + ')\n'
'values(' + ','.join(placeholders) + ')'
)
cursor.execute(insert_stmt, column_values)
print('.', end='', flush=True)
db.commit()
print(' copied.')
# note when the backup finished (to the nearest second):
bkup_end = datetime.datetime.now()
bkup_timestamp = bkup_end.isoformat()[:19]
print('Finishing backup:', bkup_timestamp)
cursor.executemany(
'insert into _provenance values(?,?)',
(
('backup_end', bkup_timestamp),
('backup_dur', (bkup_end - bkup_start).total_seconds() )
)
)
db.commit()
db.close()