Backup to SQLite via Uplink

Here’s my updated version of this to work with the modern (as of January 2022) anvil.yaml file format. I only use this with a Git version of the file, not with an exported one.

from __future__ import annotations

# standard-lib modules:
import pathlib
import sqlite3 as dblib
import datetime
import json
import shutil

# third-party modules:
import yaml  # i.e., py -m pip install pyyaml

# web-service imports
import anvil.server
from tables import app_tables


# Note: Reading a full-project .yaml file (mine is over 1MB) can take
# several seconds, yet is likely to result in exactly the same schema as
# before.  We could speed this up by skipping to the line reading
# "exported_table_schema:" before attempting to parse anything.
#
# However, the newer anvil.yaml file, which is just one source file
# in the project, is much smaller, *and* more detailed (at the table
# level).  Prefer to read the smaller file when available (and current).
#
# If no .yaml file is available, then one can get a list of a table's
# column names via app_tables.my_table.list_columns(), but this gives
# only the names, not the types.  (There is no documented way to get
# a list of tables.)  The resulting information is too sparse to be
# processed by the code below.


def load_schema(
    schema_source_path: pathlib.Path,  # full path to app-as-yaml-file
):
    filename = schema_source_path.name
    source_is_git = (filename == 'anvil.yaml')
    schema_key = 'db_schema' if source_is_git else 'exported_table_schema'
    with open(schema_source_path, encoding='utf-8') as schema_source:
        project = yaml.safe_load(schema_source)
        anvil_schema = project[schema_key]
        del project
    if source_is_git:  # instead of a full-project backup file
        # try to translate to something like the full-project format
        schema = []
        for table_name in anvil_schema:
            table_descriptor = anvil_schema[table_name]
            table_descriptor['python_name'] = table_name
            schema.append(table_descriptor)
            
        return schema
            
    return anvil_schema

def do_backup(
    schema_source_path: pathlib.Path,  # full path to app-as-yaml-file
    schema_filename: pathlib.Path,  # name of schema-only sqlite file
    dest_folder: pathlib.Path,  # folder to contain full backup file
    base_dest_name: str, # base name of backup file in folder
):
    print('Loading Project File (to get the db schema)...')
    anvil_schema = load_schema(schema_source_path)
    print('Loaded', len(anvil_schema), 'table definitions.')

    # identify the database file to contain the extracted schema:
    schema_timestamp = datetime.datetime.now().isoformat()[:19]
    schema_folder = pathlib.Path.cwd()
    schema_full_name = schema_folder / schema_filename


    # if there is already a file by this name, move it out of the way:
    if schema_full_name.exists():
        schema_full_name.replace(schema_full_name.with_suffix('.sqlite.bak'))


    # create the database file:
    db = dblib.connect(schema_full_name)
    cursor = db.cursor()


    # record basic information about the schema:
    cursor.execute('create table _provenance( key text, value text )')
    cursor.executemany(
        'insert into _provenance(key, value) values(?,?)',
        (
            ('schema_created_at', schema_timestamp),
            ('schema_source', str(schema_source_path)),
            ('source_schema_json', json.dumps(
                anvil_schema, indent=2)),
        )
    )
    db.commit()


    # record the database schema.  First, the tables:
    schema_column_names = sorted(anvil_schema[0].keys())
    schema_column_names.remove('columns')
    create_table_stmt = (
        'create table _anvil_tables(\n' +
        '  id integer primary key,\n' +
        ',\n'.join(
            '  ' + name + ' text'
            for name in schema_column_names
            if name != 'id'
        ) +
        '\n)'
    )
    cursor.execute(create_table_stmt)

    table_name_to_id = {}
    
    insert_stmt = (
        'insert into _anvil_tables(' + ', '.join(schema_column_names) + ') '
        'values(' + '?,' * (len(schema_column_names) - 1) + '?)')
    
    for table in anvil_schema:
        row_values = [table[name] for name in schema_column_names]
        cursor.execute(insert_stmt, row_values)
        last_id = cursor.execute('SELECT last_insert_rowid()').fetchone()[0]
        table_name_to_id[table['python_name']] = last_id
        
    db.commit()
    
    


    # Then the table columns:
    all_column_attributes = set()
    for table in anvil_schema:
        for column in table['columns']:
            column_attributes = set(column.keys())
            all_column_attributes |= column_attributes
    all_column_attributes.remove('admin_ui')
    create_table_stmt = (
        'create table _anvil_columns('
        '\n  _anvil_table_id  integer references _anvil_tables(id),'
        '\n  table_name      text references _anvil_tables(python_name),' +
        ',\n'.join(
            '  ' + name + ' text'
            for name in all_column_attributes
            if name not in { 'table_id' }
        ) +
        ',\n  table_id  integer references _anvil_tables(id)'
        '\n)'
    )

    cursor.execute(create_table_stmt)
    for table in anvil_schema:
        for column in table['columns']:
            column_names = ['table_name', '_anvil_table_id']
            placeholders = ['?', '?']
            column_values = [table['python_name'], table_name_to_id[table['python_name']]]
            
            if 'target' in column:
                column_names.append('table_id')
                placeholders.append('?')
                column_values.append(table_name_to_id[column['target']])
            
            for name, value in column.items():
                if name in all_column_attributes:
                    column_names.append(name)
                    column_values.append(value)
                    placeholders.append('?')
            
            insert_stmt = (
                'insert into _anvil_columns'
                '(' + ','.join(column_names) + ')\n'
                'values(' + ','.join(placeholders) + ')'
            )
            cursor.execute(insert_stmt, column_values)
    db.commit()


    def sql_quote(identifier):
        return '"' + identifier + '"'


    # Provide a table where Media values can be stored.
    # Columns needed: content_type (text), length (int), name (text),
    # url (text), bytes (blob).  Also needs a row id of some kind, for reference
    # from the "media" column.  De-duplication would be nice...

    create_table_stmt = (
        'create table _anvil_media_objects (\n' +
        #  rowid integer primary key,  IMPLICIT with SQLite
        # '  object_type  text,\n'  # BlobMedia or URLMedia -- can't tell!
        '  content_type text,\n'
        '  name         text,\n'
        '  url          text,\n'
        '  length       integer,\n'
        '  bytes        blob\n'
        # For deduplication, we would add a hash code, SHA-3 or better.
        # If all other fields match, and the hash code matches, then it's
        # the same object, for all practical purposes.
        '\n)'
    )
    cursor.execute(create_table_stmt)

    # define the actual tables:
    for table in anvil_schema:
        python_table_name = sql_quote(table["python_name"])
        create_table_stmt = (
            f'create table {python_table_name} (\n'
            '  _anvil_row_id text  primary key  not null'
        )
    
        for column in table['columns']:
            column_type = column['type']
            reference_target = reference_clause = ''
            if column_type.startswith('link_'):
                reference_target = column['target']
            if reference_target:
                reference_clause = f' references {reference_target}(_anvil_row_id)'
            column_name = sql_quote(column['name'])
            create_table_stmt += (
                f',\n  {column_name} {column_type} {reference_clause}'
            )
        create_table_stmt += '\n)'
        cursor.execute(create_table_stmt)
    db.commit()


    db.close()
    print('Boilerplate SQLite file written to:', schema_full_name)


    # identify the backup file:
    bkup_start = datetime.datetime.now()
    bkup_timestamp = bkup_start.isoformat()[:19]
    print('Starting backup:', bkup_timestamp)
    stamp_for_filename = bkup_timestamp.replace('-', '').replace(':', '')
    dest_filename = base_dest_name + '.' + stamp_for_filename + '.sqlite'
    dest_name = dest_folder / dest_filename
    print('Backing up to:', dest_name)

    # copy the boilerplate file into the backup file
    shutil.copyfile(schema_full_name, dest_name)

    # open the backup file:
    db = dblib.connect(dest_name)
    cursor = db.cursor()

    # note when the backup started (to the nearest second):
    cursor.execute(
        'insert into _provenance values(?,?)',
        ('backup_start', bkup_timestamp)
    )
    db.commit()


    media_cursor = db.cursor()
    media_insert_stmt = (
        'insert into _anvil_media_objects\n'
        '(content_type, name, url, length, bytes)\n'
        'values(?, ?, ?, ?, ?)\n'
    )

    def add_media_object(obj):
        media_cursor.execute(
            media_insert_stmt,
            (
                # obj.__name__,
                obj.content_type,
                obj.name,
                obj.url,
                obj.length,
                obj.get_bytes(),
            )
        )
        return media_cursor.lastrowid

    # copy the tables, one by one:
    for table in anvil_schema:
        python_table_name = table["python_name"]
        app_tbl = getattr(app_tables, python_table_name, None)  #.client_readable()
        # PEC 2020-01-12: I'd hoped that read-only access would be faster.  Nope.
        python_table_name = sql_quote(python_table_name)
        if app_tbl is None:
            print('Anvil table', python_table_name, 'not found')
            continue
        print('Copying', python_table_name, end='')
        for row in app_tbl.search():
            column_names = ['_anvil_row_id']
            placeholders = ['?']
            column_values = [anvil._get_live_object_id(row)] # slower: [row.get_id()]
            for name, value in dict(row).items():
                column_names.append(sql_quote(name))
                # print(name,type(value))
                if isinstance(value, dict):
                    value = json.dumps(value)
                elif isinstance(value, list):
                    try:
                        value = json.dumps(value)
                    except:
                        # Probably a multi-link column
                        obj_ids = [ obj.get_id()  for obj in value]
                        value = json.dumps(obj_ids)
                elif isinstance(value, anvil._server.LiveObjectProxy):
                    value = anvil._get_live_object_id(value) # slower: value.get_id()
                elif isinstance(value, anvil.Media):
                    row_id = add_media_object(value)
                    value = row_id
                column_values.append(value)
                placeholders.append('?')
            insert_stmt = (
                f'insert into {python_table_name}\n'
                '(' + ','.join(column_names) + ')\n'
                'values(' + ','.join(placeholders) + ')'
            )
            cursor.execute(insert_stmt, column_values)
            print('.', end='', flush=True)
        db.commit()
        print(' copied.')

    # note when the backup finished (to the nearest second):
    bkup_end = datetime.datetime.now()
    bkup_timestamp = bkup_end.isoformat()[:19]
    print('Finishing backup:', bkup_timestamp)
    cursor.executemany(
        'insert into _provenance values(?,?)',
        (
            ('backup_end', bkup_timestamp),
            ('backup_dur', (bkup_end - bkup_start).total_seconds() )
        )
    )
    db.commit()

    db.close()
5 Likes