Backup to SQLite via Uplink

I’ve written an Uplink module that backs up an App’s tables to a SQLite database file. It started out as a hard-coded script, but I’ve taken some steps to generalize it.

Some caveats:

  1. I haven’t tried it with multi-link columns. (My tables don’t have any.) To avoid trouble, the (incomplete) code to handle that is commented out. If you have such columns, you’ll want to uncomment those lines (and test thoroughly!). If you do, please let me know how it went!
  2. It reads one of the App’s .yaml files to get the database structure. So you need to have a (reasonably) current backup of your App’s source code, for it to read. Anvil reserves the right to change the format of these files, and schemas, at any time. It’s happened before. It’ll happen again. So, at that point, this code will probably break.
  3. To read the YAML, it depends on pyyaml. This is a third-party module. (py -m pip install pyyaml into your favorite Python virtual environment.)
  4. Media objects are written to their own table.
  5. Users may very well be updating the database while a backup is running. In that case, you might get mixed results.
  6. There’s no “restore” module. Restoring just the right data, safely, is likely to be very, very App-dependent. Using Row IDs to tie it all together? Row IDs of deleted rows can’t be restored; on insert, Anvil will generate brand-new ones. (And how do you keep logged-in users’ database activity from accidentally interfering with the ongoing Restore?)

At least, with a backup, you can have something to restore, even if it’s not immediately clear how.

Usage

from pathlib import Path
from BackUpAnvilAppTables_work import do_backup
# connect to your App with full table-access privileges. 
# That usually means using the Server (not Client) credentials.
do_backup(
    schema_source_path = Path('...'),  # e.g., anvil.yaml
    schema_filename = Path('...'), # schema file will be written here
    dest_folder = Path('...'), # schema+data file will be written to this folder
    base_dest_name = '...', # schema+data file name.  date+time will be added to name.
)

The Uplink Module (if there’s a better way to attach this, let me know)

# Module: BackUpAnvilAppTables_work
# Purpose: to back up an Anvil App's data (and schema) to a SQLite file.
# Requires: Python 3.7 or later; PyYaml

# This is the workhorse function, to run after the main program
# successfully identifies the application source file (.yaml) and logs in
# to Anvil.

from __future__ import annotations

# standard-lib modules:
import pathlib
import sqlite3 as dblib
import datetime
import json
import shutil

# third-party modules:
import yaml  # i.e., py -m pip install pyyaml

# web-service imports
import anvil.server
from tables import app_tables


# Note: Reading a full-project .yaml file (mine is over 1MB) can take
# several seconds, yet is likely to result in exactly the same schema as
# before.  We could speed this up by skipping to the line reading
# "exported_table_schema:" before attempting to parse anything.
#
# However, the newer anvil.yaml file, which is just one source file
# in the project, is much smaller, *and* more detailed (at the table
# level).  Prefer to read the smaller file when available (and current).
#
# If no .yaml file is available, then one can get a list of a table's
# column names via app_tables.my_table.list_columns(), but this gives
# only the names, not the types.  (There is no documented way to get
# a list of tables.)  The resulting information is too sparse to be
# processed by the code below.


def load_schema(
    schema_source_path: pathlib.Path,  # full path to app-as-yaml-file
):
    filename = schema_source_path.name
    source_is_git = (filename == 'anvil.yaml')
    schema_key = 'db_schema' if source_is_git else 'exported_table_schema'
    with open(schema_source_path, encoding='utf-8') as schema_source:
        project = yaml.safe_load(schema_source)
        anvil_schema = project[schema_key]
        del project
    if source_is_git:  # instead of a full-project backup file
        # try to translate to something like the full-project format
        for table_descriptor in anvil_schema:
            a = table_descriptor['access']
            for column_name in [ 'server', 'client', 'app_id',
                'table_mapping_name', 'table_mapping_id'
            ]:
                table_descriptor[column_name] = a[column_name]
            # The other fields in table['access'] are duplicates.
            # Just in case we missed something, convert it to a data
            # type that SQLite can handle.
            table_descriptor['access'] = json.dumps(a)
    return anvil_schema

def do_backup(
    schema_source_path: pathlib.Path,  # full path to app-as-yaml-file
    schema_filename: pathlib.Path,  # name of schema-only sqlite file
    dest_folder: pathlib.Path,  # folder to contain full backup file
    base_dest_name: str, # base name of backup file in folder
):
    print('Loading Project File (to get the db schema)...')
    anvil_schema = load_schema(schema_source_path)
    print('Loaded', len(anvil_schema), 'table definitions.')

    # create a lookup table (numeric table id -> python name) for later use:
    table_name_from_id = {
        table['id']: table['python_name']
        for table in anvil_schema
    }
    print(table_name_from_id.values())


    # identify the database file to contain the extracted schema:
    schema_timestamp = datetime.datetime.now().isoformat()[:19]
    schema_folder = pathlib.Path.cwd()
    schema_full_name = schema_folder / schema_filename


    # if there is already a file by this name, move it out of the way:
    if schema_full_name.exists():
        schema_full_name.replace(schema_full_name.with_suffix('.sqlite.bak'))


    # create the database file:
    db = dblib.connect(schema_full_name)
    cursor = db.cursor()


    # record basic information about the schema:
    cursor.execute('create table _provenance( key text, value text )')
    cursor.executemany(
        'insert into _provenance(key, value) values(?,?)',
        (
            ('schema_created_at', schema_timestamp),
            ('schema_source', str(schema_source_path)),
            ('source_schema_json', json.dumps(
                anvil_schema, indent=2)),
        )
    )
    db.commit()


    # record the database schema.  First, the tables:
    schema_column_names = sorted(anvil_schema[0].keys())
    schema_column_names.remove('columns')
    create_table_stmt = (
        'create table _anvil_tables(\n' +
        '  id integer primary key,\n' +
        ',\n'.join(
            '  ' + name + ' text'
            for name in schema_column_names
            if name != 'id'
        ) +
        '\n)'
    )
    cursor.execute(create_table_stmt)

    insert_stmt = (
        'insert into _anvil_tables(' + ', '.join(schema_column_names) + ') '
        'values(' + '?,' * (len(schema_column_names) - 1) + '?)')
    for table in anvil_schema:
        row_values = [table[name] for name in schema_column_names]
        cursor.execute(insert_stmt, row_values)
    db.commit()


    # Then the table columns:
    all_column_attributes = set()
    for table in anvil_schema:
        for column in table['columns'].values():
            column_attributes = set(column.keys())
            all_column_attributes |= column_attributes
    all_column_attributes.remove('admin_ui')
    create_table_stmt = (
        'create table _anvil_columns('
        '\n  _anvil_table_id  integer references _anvil_tables(id),'
        '\n  _anvil_column_id text,'
        '\n  table_name      text references _anvil_tables(python_name),' +
        ',\n'.join(
            '  ' + name + ' text'
            for name in all_column_attributes
            if name not in { 'table_id' }
        ) +
        ',\n  table_id  integer references _anvil_tables(id)'
        '\n)'
    )

    cursor.execute(create_table_stmt)
    for table in anvil_schema:
        for anvil_column_id, column_def in table['columns'].items():
            column_names = ['_anvil_table_id', '_anvil_column_id', 'table_name']
            placeholders = ['?', '?', '?']
            column_values = [table['id'], anvil_column_id, table['python_name']]
            for name, value in column_def.items():
                if name in all_column_attributes:
                    column_names.append(name)
                    column_values.append(value)
                    placeholders.append('?')
            insert_stmt = (
                'insert into _anvil_columns'
                '(' + ','.join(column_names) + ')\n'
                'values(' + ','.join(placeholders) + ')'
            )
            cursor.execute(insert_stmt, column_values)
    db.commit()


    def sql_quote(identifier):
        return '"' + identifier + '"'


    # Provide a table where Media values can be stored.
    # Columns needed: content_type (text), length (int), name (text),
    # url (text), bytes (blob).  Also needs a row id of some kind, for reference
    # from the "media" column.  De-duplication would be nice...

    create_table_stmt = (
        'create table _anvil_media_objects (\n' +
        #  rowid integer primary key,  IMPLICIT with SQLite
        # '  object_type  text,\n'  # BlobMedia or URLMedia -- can't tell!
        '  content_type text,\n'
        '  name         text,\n'
        '  url          text,\n'
        '  length       integer,\n'
        '  bytes        blob\n'
        # For deduplication, we would add a hash code, SHA-3 or better.
        # If all other fields match, and the hash code matches, then it's
        # the same object, for all practical purposes.
        '\n)'
    )
    cursor.execute(create_table_stmt)

    # define the actual tables:
    for table in anvil_schema:
        python_table_name = sql_quote(table["python_name"])
        create_table_stmt = (
            f'create table {python_table_name} (\n'
            '  _anvil_row_id text  primary key  not null'
        )
        for anvil_column_id, column_def in table['columns'].items():
            column_type = column_def['type']
            reference_target = reference_clause = ''
            if column_type.startswith('liveObject'):
                reference_target = sql_quote(
                    table_name_from_id[column_def['table_id']])
            if reference_target:
                reference_clause = f' references {reference_target}(_anvil_row_id)'
            column_name = sql_quote(column_def['name'])
            create_table_stmt += (
                f',\n  {column_name} {column_type} {reference_clause}'
            )
        create_table_stmt += '\n)'
        cursor.execute(create_table_stmt)
    db.commit()


    db.close()
    print('Boilerplate SQLite file written to:', schema_full_name)


    # identify the backup file:
    bkup_start = datetime.datetime.now()
    bkup_timestamp = bkup_start.isoformat()[:19]
    print('Starting backup:', bkup_timestamp)
    stamp_for_filename = bkup_timestamp.replace('-', '').replace(':', '')
    dest_filename = base_dest_name + '.' + stamp_for_filename + '.sqlite'
    dest_name = dest_folder / dest_filename
    print('Backing up to:', dest_name)

    # copy the boilerplate file into the backup file
    shutil.copyfile(schema_full_name, dest_name)

    # open the backup file:
    db = dblib.connect(dest_name)
    cursor = db.cursor()

    # note when the backup started (to the nearest second):
    cursor.execute(
        'insert into _provenance values(?,?)',
        ('backup_start', bkup_timestamp)
    )
    db.commit()


    media_cursor = db.cursor()
    media_insert_stmt = (
        'insert into _anvil_media_objects\n'
        '(content_type, name, url, length, bytes)\n'
        'values(?, ?, ?, ?, ?)\n'
    )

    def add_media_object(obj):
        media_cursor.execute(
            media_insert_stmt,
            (
                # obj.__name__,
                obj.content_type,
                obj.name,
                obj.url,
                obj.length,
                obj.get_bytes(),
            )
        )
        return media_cursor.lastrowid

    # copy the tables, one by one:
    for table in anvil_schema:
        python_table_name = table["python_name"]
        app_tbl = getattr(app_tables, python_table_name, None)  #.client_readable()
        # PEC 2020-01-12: I'd hoped that read-only access would be faster.  Nope.
        python_table_name = sql_quote(python_table_name)
        if app_tbl is None:
            print('Anvil table', python_table_name, 'not found')
            continue
        print('Copying', python_table_name, end='')
        for row in app_tbl.search():
            column_names = ['_anvil_row_id']
            placeholders = ['?']
            column_values = [anvil._get_live_object_id(row)] # slower: [row.get_id()]
            for name, value in dict(row).items():
                column_names.append(sql_quote(name))
                # print(name,type(value))
                if isinstance(value, dict) or isinstance(value, list):
                    value = json.dumps(value)
                # elif isinstance(value, anvil._server.LiveObjectArray):
                #    obj_ids = [ obj.get_id  for obj in value]
                #    value = json.dumps(obj_ids)
                elif isinstance(value, anvil._server.LiveObjectProxy):
                    value = anvil._get_live_object_id(value) # slower: value.get_id()
                elif isinstance(value, anvil.Media):
                    row_id = add_media_object(value)
                    value = row_id
                column_values.append(value)
                placeholders.append('?')
            insert_stmt = (
                f'insert into {python_table_name}\n'
                '(' + ','.join(column_names) + ')\n'
                'values(' + ','.join(placeholders) + ')'
            )
            cursor.execute(insert_stmt, column_values)
            print('.', end='', flush=True)
        db.commit()
        print(' copied.')

    # note when the backup finished (to the nearest second):
    bkup_end = datetime.datetime.now()
    bkup_timestamp = bkup_end.isoformat()[:19]
    print('Finishing backup:', bkup_timestamp)
    cursor.executemany(
        'insert into _provenance values(?,?)',
        (
            ('backup_end', bkup_timestamp),
            ('backup_dur', (bkup_end - bkup_start).total_seconds() )
        )
    )
    db.commit()

    db.close()

No doubt this can be improved upon.

For browsing the resulting SQLite file, I suggest SQLite Studio


or your other favorite database GUI tool.

4 Likes

Thanks for posting this!

I can confirm that it doesn’t work with multi-link columns. Those are coming through as type list, whose individual values are live object proxies. But this is a great start, and those of us with multi-link columns can try to get that working.

It might make sense to put the file up on a Github repo so others who make changes to it can contribute patches back.

Edit: put together a quick hack for multi-link columns that seems to work and triggers on the current exception when trying to convert the list of live object proxies to JSON. Probably better ways of doing this:

                if isinstance(value, dict):
                    value = json.dumps(value)
                elif isinstance(value, list):
                    try:
                        value = json.dumps(value)
                    except:
                        # Probably a multi-link column
                        obj_ids = [ obj.get_id()  for obj in value]
                        value = json.dumps(obj_ids)
                elif isinstance(value, anvil._server.LiveObjectProxy):
                    value = anvil._get_live_object_id(value) # slower: value.get_id()
                elif isinstance(value, anvil.Media):
                    row_id = add_media_object(value)
                    value = row_id

@jshaffstall, thank you for that hack!

Yes, I should get involved with Github. That would also help in giving credit where credit is due.

Here is another handy SQLite tool for working on/with SQLite db files:

DB Browser for SQLite
https://sqlitebrowser.org/

1 Like

Here’s my updated version of this to work with the modern (as of January 2022) anvil.yaml file format. I only use this with a Git version of the file, not with an exported one.

from __future__ import annotations

# standard-lib modules:
import pathlib
import sqlite3 as dblib
import datetime
import json
import shutil

# third-party modules:
import yaml  # i.e., py -m pip install pyyaml

# web-service imports
import anvil.server
from tables import app_tables


# Note: Reading a full-project .yaml file (mine is over 1MB) can take
# several seconds, yet is likely to result in exactly the same schema as
# before.  We could speed this up by skipping to the line reading
# "exported_table_schema:" before attempting to parse anything.
#
# However, the newer anvil.yaml file, which is just one source file
# in the project, is much smaller, *and* more detailed (at the table
# level).  Prefer to read the smaller file when available (and current).
#
# If no .yaml file is available, then one can get a list of a table's
# column names via app_tables.my_table.list_columns(), but this gives
# only the names, not the types.  (There is no documented way to get
# a list of tables.)  The resulting information is too sparse to be
# processed by the code below.


def load_schema(
    schema_source_path: pathlib.Path,  # full path to app-as-yaml-file
):
    filename = schema_source_path.name
    source_is_git = (filename == 'anvil.yaml')
    schema_key = 'db_schema' if source_is_git else 'exported_table_schema'
    with open(schema_source_path, encoding='utf-8') as schema_source:
        project = yaml.safe_load(schema_source)
        anvil_schema = project[schema_key]
        del project
    if source_is_git:  # instead of a full-project backup file
        # try to translate to something like the full-project format
        schema = []
        for table_name in anvil_schema:
            table_descriptor = anvil_schema[table_name]
            table_descriptor['python_name'] = table_name
            schema.append(table_descriptor)
            
        return schema
            
    return anvil_schema

def do_backup(
    schema_source_path: pathlib.Path,  # full path to app-as-yaml-file
    schema_filename: pathlib.Path,  # name of schema-only sqlite file
    dest_folder: pathlib.Path,  # folder to contain full backup file
    base_dest_name: str, # base name of backup file in folder
):
    print('Loading Project File (to get the db schema)...')
    anvil_schema = load_schema(schema_source_path)
    print('Loaded', len(anvil_schema), 'table definitions.')

    # identify the database file to contain the extracted schema:
    schema_timestamp = datetime.datetime.now().isoformat()[:19]
    schema_folder = pathlib.Path.cwd()
    schema_full_name = schema_folder / schema_filename


    # if there is already a file by this name, move it out of the way:
    if schema_full_name.exists():
        schema_full_name.replace(schema_full_name.with_suffix('.sqlite.bak'))


    # create the database file:
    db = dblib.connect(schema_full_name)
    cursor = db.cursor()


    # record basic information about the schema:
    cursor.execute('create table _provenance( key text, value text )')
    cursor.executemany(
        'insert into _provenance(key, value) values(?,?)',
        (
            ('schema_created_at', schema_timestamp),
            ('schema_source', str(schema_source_path)),
            ('source_schema_json', json.dumps(
                anvil_schema, indent=2)),
        )
    )
    db.commit()


    # record the database schema.  First, the tables:
    schema_column_names = sorted(anvil_schema[0].keys())
    schema_column_names.remove('columns')
    create_table_stmt = (
        'create table _anvil_tables(\n' +
        '  id integer primary key,\n' +
        ',\n'.join(
            '  ' + name + ' text'
            for name in schema_column_names
            if name != 'id'
        ) +
        '\n)'
    )
    cursor.execute(create_table_stmt)

    table_name_to_id = {}
    
    insert_stmt = (
        'insert into _anvil_tables(' + ', '.join(schema_column_names) + ') '
        'values(' + '?,' * (len(schema_column_names) - 1) + '?)')
    
    for table in anvil_schema:
        row_values = [table[name] for name in schema_column_names]
        cursor.execute(insert_stmt, row_values)
        last_id = cursor.execute('SELECT last_insert_rowid()').fetchone()[0]
        table_name_to_id[table['python_name']] = last_id
        
    db.commit()
    
    


    # Then the table columns:
    all_column_attributes = set()
    for table in anvil_schema:
        for column in table['columns']:
            column_attributes = set(column.keys())
            all_column_attributes |= column_attributes
    all_column_attributes.remove('admin_ui')
    create_table_stmt = (
        'create table _anvil_columns('
        '\n  _anvil_table_id  integer references _anvil_tables(id),'
        '\n  table_name      text references _anvil_tables(python_name),' +
        ',\n'.join(
            '  ' + name + ' text'
            for name in all_column_attributes
            if name not in { 'table_id' }
        ) +
        ',\n  table_id  integer references _anvil_tables(id)'
        '\n)'
    )

    cursor.execute(create_table_stmt)
    for table in anvil_schema:
        for column in table['columns']:
            column_names = ['table_name', '_anvil_table_id']
            placeholders = ['?', '?']
            column_values = [table['python_name'], table_name_to_id[table['python_name']]]
            
            if 'target' in column:
                column_names.append('table_id')
                placeholders.append('?')
                column_values.append(table_name_to_id[column['target']])
            
            for name, value in column.items():
                if name in all_column_attributes:
                    column_names.append(name)
                    column_values.append(value)
                    placeholders.append('?')
            
            insert_stmt = (
                'insert into _anvil_columns'
                '(' + ','.join(column_names) + ')\n'
                'values(' + ','.join(placeholders) + ')'
            )
            cursor.execute(insert_stmt, column_values)
    db.commit()


    def sql_quote(identifier):
        return '"' + identifier + '"'


    # Provide a table where Media values can be stored.
    # Columns needed: content_type (text), length (int), name (text),
    # url (text), bytes (blob).  Also needs a row id of some kind, for reference
    # from the "media" column.  De-duplication would be nice...

    create_table_stmt = (
        'create table _anvil_media_objects (\n' +
        #  rowid integer primary key,  IMPLICIT with SQLite
        # '  object_type  text,\n'  # BlobMedia or URLMedia -- can't tell!
        '  content_type text,\n'
        '  name         text,\n'
        '  url          text,\n'
        '  length       integer,\n'
        '  bytes        blob\n'
        # For deduplication, we would add a hash code, SHA-3 or better.
        # If all other fields match, and the hash code matches, then it's
        # the same object, for all practical purposes.
        '\n)'
    )
    cursor.execute(create_table_stmt)

    # define the actual tables:
    for table in anvil_schema:
        python_table_name = sql_quote(table["python_name"])
        create_table_stmt = (
            f'create table {python_table_name} (\n'
            '  _anvil_row_id text  primary key  not null'
        )
    
        for column in table['columns']:
            column_type = column['type']
            reference_target = reference_clause = ''
            if column_type.startswith('link_'):
                reference_target = column['target']
            if reference_target:
                reference_clause = f' references {reference_target}(_anvil_row_id)'
            column_name = sql_quote(column['name'])
            create_table_stmt += (
                f',\n  {column_name} {column_type} {reference_clause}'
            )
        create_table_stmt += '\n)'
        cursor.execute(create_table_stmt)
    db.commit()


    db.close()
    print('Boilerplate SQLite file written to:', schema_full_name)


    # identify the backup file:
    bkup_start = datetime.datetime.now()
    bkup_timestamp = bkup_start.isoformat()[:19]
    print('Starting backup:', bkup_timestamp)
    stamp_for_filename = bkup_timestamp.replace('-', '').replace(':', '')
    dest_filename = base_dest_name + '.' + stamp_for_filename + '.sqlite'
    dest_name = dest_folder / dest_filename
    print('Backing up to:', dest_name)

    # copy the boilerplate file into the backup file
    shutil.copyfile(schema_full_name, dest_name)

    # open the backup file:
    db = dblib.connect(dest_name)
    cursor = db.cursor()

    # note when the backup started (to the nearest second):
    cursor.execute(
        'insert into _provenance values(?,?)',
        ('backup_start', bkup_timestamp)
    )
    db.commit()


    media_cursor = db.cursor()
    media_insert_stmt = (
        'insert into _anvil_media_objects\n'
        '(content_type, name, url, length, bytes)\n'
        'values(?, ?, ?, ?, ?)\n'
    )

    def add_media_object(obj):
        media_cursor.execute(
            media_insert_stmt,
            (
                # obj.__name__,
                obj.content_type,
                obj.name,
                obj.url,
                obj.length,
                obj.get_bytes(),
            )
        )
        return media_cursor.lastrowid

    # copy the tables, one by one:
    for table in anvil_schema:
        python_table_name = table["python_name"]
        app_tbl = getattr(app_tables, python_table_name, None)  #.client_readable()
        # PEC 2020-01-12: I'd hoped that read-only access would be faster.  Nope.
        python_table_name = sql_quote(python_table_name)
        if app_tbl is None:
            print('Anvil table', python_table_name, 'not found')
            continue
        print('Copying', python_table_name, end='')
        for row in app_tbl.search():
            column_names = ['_anvil_row_id']
            placeholders = ['?']
            column_values = [anvil._get_live_object_id(row)] # slower: [row.get_id()]
            for name, value in dict(row).items():
                column_names.append(sql_quote(name))
                # print(name,type(value))
                if isinstance(value, dict):
                    value = json.dumps(value)
                elif isinstance(value, list):
                    try:
                        value = json.dumps(value)
                    except:
                        # Probably a multi-link column
                        obj_ids = [ obj.get_id()  for obj in value]
                        value = json.dumps(obj_ids)
                elif isinstance(value, anvil._server.LiveObjectProxy):
                    value = anvil._get_live_object_id(value) # slower: value.get_id()
                elif isinstance(value, anvil.Media):
                    row_id = add_media_object(value)
                    value = row_id
                column_values.append(value)
                placeholders.append('?')
            insert_stmt = (
                f'insert into {python_table_name}\n'
                '(' + ','.join(column_names) + ')\n'
                'values(' + ','.join(placeholders) + ')'
            )
            cursor.execute(insert_stmt, column_values)
            print('.', end='', flush=True)
        db.commit()
        print(' copied.')

    # note when the backup finished (to the nearest second):
    bkup_end = datetime.datetime.now()
    bkup_timestamp = bkup_end.isoformat()[:19]
    print('Finishing backup:', bkup_timestamp)
    cursor.executemany(
        'insert into _provenance values(?,?)',
        (
            ('backup_end', bkup_timestamp),
            ('backup_dur', (bkup_end - bkup_start).total_seconds() )
        )
    )
    db.commit()

    db.close()
5 Likes