Data Tables import/export tool

I couldn’t find a way of copying the contents of Data Tables between different Databases, or from a managed Database to a self-hosted App Server instance, so I’ve been gradually putting together a Python script to do this automatically.

The script has two subcommands, export and import. I usually run two instances and pipe the export straight into the import, but it can also write to and read from files, making it useful for backing up the contents of your tables as well.

The exporter reads the schema from your anvil.yaml file. It uses the column type information to serialize special types (e.g. dates) as well as single and multiple row links. It also performs the export in topological order to guarantee for the importer that linked rows are always defined before they are referenced. (Caveat: the topological sort will fail if the link graph has any cycles i.e. you have circularly linked rows - I believe this could be solved with a two-pass approach that creates all rows in the first pass and sets all links in the second pass, but I haven’t bothered trying to implement this since none of my schemas use circular references). All table contents, as well as a timestamp and a copy of the schema, are serialized into a JSON object.

The importer reads from JSON the schema and the contents, and reassembles the Python objects representing each row. It adds the rows to the target database in batches (note this won’t work without Accelerated Tables). It relinks the single and multiple link columns along the way.

Note that the uplink key and url come from environment variables, but filenames come from command line args. I use .env files to manage uplink info so my typical usage looks something like:
dotenv -e production.env -- python tables_sync.py export | dotenv -e staging.env -- python tables_sync.py import

The source code below is under the MIT license.

Potential future work if there’s interest:

  • More samples/better documentation
  • Circular reference support
  • Interactive mode of some kind
# Copyright (c) 2023 jensrst
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import anvil.server
from anvil.tables import app_tables
import anvil.tables
import yaml
import graphlib
import json
from datetime import date, datetime
import sys
import argparse
import os


def export_tables(anvil_yaml):
    db_schema = anvil_yaml['db_schema']
    package_name = anvil_yaml['package_name']

    def column_is_link(column): return column['type'] in [
        'link_single', 'link_multiple']
    table_names_in_order = list(graphlib.TopologicalSorter({
        table_name: {col['target']
                     for col in table['columns'] if column_is_link(col)}
        for table_name, table in db_schema.items()
    }).static_order())
    db_schema_in_order = {
        table_name: db_schema[table_name] for table_name in table_names_in_order
    }

    @anvil.tables.in_transaction
    def export_all():
        timestamp = datetime.now()
        obj = {
            '$timestamp': timestamp.isoformat(),
            '$schema': db_schema_in_order,
            '$data': {}
        }
        total_rows = 0
        for table_name in table_names_in_order:
            def jsonize(row):
                out_row = {}
                for col in db_schema[table_name]['columns']:
                    def verify_and_return_linked_id(cell):
                        cell_id = cell.get_id()
                        if db_schema[col['target']]['columns']:
                            try:
                                _ = cell[db_schema[col['target']]
                                         ['columns'][0]['name']]
                            except anvil.tables.RowDeleted:
                                print(f'WARNING: row {row.get_id()} in {table_name}.{col["name"]} (referencing {col["target"]}) has dead link: {cell_id}',
                                      file=sys.stderr)
                                return None
                        return cell_id
                    cell = row[col['name']]
                    if cell:
                        if col['type'] == 'date':
                            cell = cell.isoformat()
                        elif col['type'] == 'datetime':
                            cell = cell.isoformat()
                        elif col['type'] == 'link_single':
                            cell = verify_and_return_linked_id(cell)
                        elif col['type'] == 'link_multiple':
                            cell = [verify_and_return_linked_id(
                                entry) for entry in cell]
                    out_row[col['name']] = cell
                return out_row

            data = {
                row.get_id(): jsonize(row)
                for row in getattr(app_tables, table_name).search()
            }
            obj['$data'][table_name] = data
            total_rows += len(data)
            print(
                f'exported {len(data)} rows from {table_name} (total rows {total_rows})', file=sys.stderr)

        return obj, timestamp

    obj, timestamp = export_all()
    default_filename = f'{package_name}-{timestamp:%Y_%m_%d-%H_%M_%S_%f}.tables.json'
    return obj, default_filename


def import_tables(obj):
    db_schema = obj['$schema']
    data = obj['$data']

    @anvil.tables.in_transaction
    def import_all():
        live_objects = {}
        for table_name, table in data.items():
            getattr(app_tables, table_name).delete_all_rows()

            def objectize(old_id, row):
                out_row = {}
                for col in db_schema[table_name]['columns']:
                    def follow_link(cell):
                        if cell in live_objects:
                            return live_objects[cell]
                        else:
                            print(
                                f'WARNING: {table_name}.{col["name"]} {old_id} (referencing {col["target"]}) has dead link: {cell}', file=sys.stderr)
                    cell = row[col['name']]
                    if cell:
                        if col['type'] == 'date':
                            cell = date.fromisoformat(cell)
                        elif col['type'] == 'datetime':
                            cell = datetime.fromisoformat(cell)
                        elif col['type'] == 'link_single':
                            cell = follow_link(cell)
                        elif col['type'] == 'link_multiple':
                            cell = [follow_link(entry) for entry in cell]
                    out_row[col['name']] = cell
                return out_row

            live_objects.update({
                old_id: live_object for old_id, live_object in zip(
                    table.keys(),
                    getattr(app_tables, table_name).add_rows([
                        objectize(old_id, row) for old_id, row in table.items()
                    ])
                )
            })

            assert len(getattr(app_tables, table_name).search()) == len(table)
            print(
                f'imported {len(table)} rows into {table_name} (total rows {len(live_objects)})', file=sys.stderr)

    import_all()


if __name__ == '__main__':
    ANVIL_UPLINK_KEY = os.environ.get('ANVIL_UPLINK_KEY')  # required
    ANVIL_UPLINK_URL = os.environ.get(
        'ANVIL_UPLINK_URL', 'wss://anvil.works/uplink')  # optional

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True, dest='subcommand')

    import_parser = subparsers.add_parser('import')
    import_parser.add_argument('filename', default='-', nargs='?',
                               help='file to read from (pipe or use `-` to read from stdin)')
    import_parser.add_argument('-y', '--yes', action='store_true', dest='yes',
                               help='bypass confirmation prompt to overwrite all tables on destination server')

    export_parser = subparsers.add_parser('export')
    export_parser.add_argument('filename', default=None, nargs='?',
                               help='file to write to (defaults to `{package_name}-{timestamp}.tables.json`; pipe or use `-` to write to stdout)')
    export_parser.add_argument('-s', '--schema', dest='schema_filename', default='./anvil.yaml',
                               help='path to anvil.yaml file including db_schema for source server')

    args = parser.parse_args()
    try:
        # anvil.server.connect(args.uplink_key, url=args.uplink_url, quiet=True)
        if not ANVIL_UPLINK_KEY:
            raise ValueError('must set ANVIL_UPLINK_KEY in environment')
        anvil.server.connect(
            ANVIL_UPLINK_KEY, url=ANVIL_UPLINK_URL, quiet=True)

        if args.subcommand == 'export':
            print(
                f'exporting from {ANVIL_UPLINK_URL} with schema {args.schema_filename}', file=sys.stderr)

            with open(args.schema_filename, 'r') as f:
                anvil_yaml = yaml.safe_load(f)

            obj, default_filename = export_tables(anvil_yaml)

            if args.filename == '-' or not os.isatty(1):
                json.dump(obj, sys.stdout, indent=4)
                print(f'finished export to stdout', file=sys.stderr)
            else:
                filename = args.filename or default_filename
                with open(filename, 'w') as f:
                    json.dump(obj, f, indent=4)
                print(f"finished export to '{filename}'", file=sys.stderr)

        elif args.subcommand == 'import':
            def confirm():
                msg = f"WARNING: This action will overwrite all database content for the '{anvil.app.environment.name}' environment on the '{ANVIL_UPLINK_URL}' server. Are you sure you wish to continue (y/N)? "
                return input(msg).lower() in ['y', 'yes']

            if args.filename == '-' or not os.isatty(0):
                print(f'importing from stdin', file=sys.stderr)
                obj = json.load(sys.stdin)
                try:
                    sys.stdin.close()
                    sys.stdin = open('/dev/tty')
                except OSError as e:
                    raise OSError(
                        "can't reopen stdin; must pass -y to confirm when importing from stdin") from e
            else:
                print(f'importing from {args.filename}', file=sys.stderr)
                with open(args.filename, 'r') as f:
                    obj = json.load(f)

            if args.yes or confirm():
                import_tables(obj)
                print(
                    f"finished import to '{anvil.app.environment.name}' on '{ANVIL_UPLINK_URL}'", file=sys.stderr)
            else:
                raise PermissionError(
                    'user declined to continue - import not run')
        else:
            raise AssertionError('unreachable')
    finally:
        anvil.server.disconnect()

4 Likes

FANTASTIC!!! Can’t wait to try this out.

Alas, I do have circular references. But I’m considering breaking those loops. I’m also trying to replace all link-type columns with more portable UUIDs, whereever I can.

I love the piping approach. I used it (on a subset of the object tree) for the exact same reason: there’s no way to have a single Python program connect to both source and destination databases (Deployment Environments, DEs) at the same time. It takes two programs (or program instances), one for each DE.

1 Like