openAI call consuming lots of memory?

What I’m trying to do:
I am cutting up a PDF, turning it into html and sending chunks of up to 4000 chars of html text to the openAI API to analyse.

In order to get through the PDF faster I parallelise the process by creating a new background task for each of the openAI calls.

Unfortunately, I am running into memory issues. Meredydd has already been kind enough to set my per-instance memory allocation to 2GB (see our discussion here: Downlink Disconnected x8, servercode exited unexpectedly x3, connection failed x2 - #2). This allows me to increase the number of simultaneous background openAI tasks I run from 5 to 15, but above that it still fails.

I have introduced print statements and the failure always happens on this line:

response = client.chat.completions.create(
            model=model,
            response_format=response_format,
            messages=prompt_messages,
            temperature=temperature,
            max_tokens=int(max_tokens),
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )

Is it reasonable that with 2GB of memory I can only run 15 simultaneous background tasks? The PDF itself is 700KB large.

Does anyone have suggestions about how I can optimise this or what I should look into?

My code:

import anvil.email
import anvil.secrets
import anvil.server
from openai import OpenAI
from anvil import tables
import time
import sys
import json
from datetime import datetime
import anvil.tables.query as q
from anvil.tables import app_tables
from . import class_prompt_handler
from .stripe_fns import max_lcs_to_generate

client = None

prompt_cycle_count = {
    'PDF_CHUNKS_TO_CLOZE': 1,
    'FACTS_TO_CLOZE': 6,
    'FACTS_TO_Q_AND_A': 7
}

@anvil.server.background_task
def generate_AI_lcs_v2(session_id, pdf_chunk_indices, ai_type, simultaneous_ai_tasks=1, retries=4, max_wait_time=45):
    print(f"Entered generate_AI_lcs function with session_id: {session_id}, simultaneous_ai_tasks: {simultaneous_ai_tasks}, pd_chunk_indices: {pdf_chunk_indices}, retries={retries}, max_wait_time={max_wait_time}")

    # Initialize variables
    start_time = time.time()
    active_tasks = []
    generation_credits_empty = False
    pdf_chunks = get_pdf_chunks(session_id, pdf_chunk_indices)
    current_pdf_chunk_index = 0
    num_pdf_chunks = len(pdf_chunks)
    failed_tasks_counter = 0
    max_failed_tasks_before_full_fail = 5

    if num_pdf_chunks == 0:
        print("No learning cards to process.")
        return
    print(f"Processing {num_pdf_chunks} lcs")

    session = app_tables.session_info.get_by_id(session_id)
    session['ai_generating'] = True
    user = session['user']

    if user:
        print(f"generating lcs for session {session_id} and user {user['email']} for deck {session['lc_group']}")
    else:
        print(f"generating lcs for session {session_id} and not logged in user for deck {session['lc_group']}")

    # Launch initial set of tasks
    while len(active_tasks) < min(simultaneous_ai_tasks, num_pdf_chunks) and current_pdf_chunk_index < num_pdf_chunks:
        chunk = pdf_chunks[current_pdf_chunk_index]

        # do pics immediately.
        if chunk['chunk_type'] == 'pic':
            app_tables.temp_insertion_waiting_list.add_row(session_id=session_id, pdf_chunk=chunk, final_response=[chunk['chunk_html']], index=current_pdf_chunk_index, inserted=False)
            print(f'initially added pic to temp table with chunk index: {current_pdf_chunk_index}')
        else:
            # non pics
            task = anvil.server.launch_background_task('generate_AI_lc_v2', session_id, chunk, ai_type, retries, max_wait_time, current_pdf_chunk_index)
            active_tasks.append((task, chunk['chunk_index']))  # Store task with its chunk index for reference
            print(f'initially added non-pic task to active tasks with chunk index: {current_pdf_chunk_index}')

        current_pdf_chunk_index += 1

    # Monitor and manage tasks
    while current_pdf_chunk_index < num_pdf_chunks and not generation_credits_empty:
        time.sleep(60)
        active_tasks, failed_tasks_counter = handle_active_tasks(active_tasks, failed_tasks_counter, pdf_chunks, max_failed_tasks_before_full_fail, session_id, ai_type, retries, max_wait_time)
        consecutively_insert_into_lc_db(session_id)

        # add new tasks if there's room
        while len(active_tasks) < simultaneous_ai_tasks and current_pdf_chunk_index < num_pdf_chunks and not generation_credits_empty:
            # Check if user still has credits, then launch a new task
            available_generation_credits, user, _ = get_available_generation_credits(session_id)
            print(f'available_generation_credits: {available_generation_credits}, user:{user}')

            if available_generation_credits > 0:
                new_chunk = pdf_chunks[current_pdf_chunk_index]
                if new_chunk['chunk_type'] == 'pic':
                    app_tables.temp_insertion_waiting_list.add_row(session_id=session_id, pdf_chunk=new_chunk, final_response=[new_chunk['chunk_html']], index=current_pdf_chunk_index, inserted=False)
                    print(f'active task finished, so added pic to temp table with chunk index: {current_pdf_chunk_index}')
                else:
                    new_task = anvil.server.launch_background_task('generate_AI_lc_v2', session_id, new_chunk, ai_type, retries, max_wait_time, current_pdf_chunk_index)
                    active_tasks.append((new_task, new_chunk['chunk_index']))
                    print(f'active task finished, so added non-pic task to active tasks with chunk index: {current_pdf_chunk_index}')

                current_pdf_chunk_index += 1
            else:
                print(f"User {user} with session {session_id} has reached their learning card generation limit.")
                generation_credits_empty = True # End this background task as the user has no credits left

    # if there are no more credits or all the pdf chunks have been assigned a task, wait for all tasks to finish
    while len(active_tasks) > 0:
        active_tasks, failed_tasks_counter = handle_active_tasks(active_tasks, failed_tasks_counter, pdf_chunks, max_failed_tasks_before_full_fail, session_id, ai_type, retries, max_wait_time)       
        consecutively_insert_into_lc_db(session_id)
        time.sleep(60)

    # run the insertion function one last time and then empty the table after everything is done
    consecutively_insert_into_lc_db(session_id)
    temp_rows = app_tables.temp_insertion_waiting_list.search(session_id=session_id)
    temp_rows.delete_all_rows()
    print('all temp rows deleted')  
    session['ai_generating'] = False 

    end_time = time.time()
    duration = end_time - start_time
    print(f"generate_AI_lcs_v2 task completed in {duration} seconds")
    print('generate_AI_lcs completed')

@anvil.server.background_task
def generate_AI_lc_v2(session_id, chunk, ai_type, retries, max_wait_time, index):
    start_time = time.time()
    current_date_time = datetime.now()
    num_prompt_cycles = prompt_cycle_count.get(ai_type, 1)
    prompt_handler = class_prompt_handler.PromptHandler()
    processed_responses = [""] * (num_prompt_cycles+1)
    processed_responses[0] = {"original_text": chunk['chunk_html']}

    session = app_tables.session_info.get_by_id(session_id)
    user = session['user']
    if user:
        print(f"task {index} generating lcs for session {session_id} and user {user['email']} for deck {session['lc_group']}")
    else:
        print(f"task {index} generating lcs for session {session_id} and not logged in user for deck {session['lc_group']}")

    print(f"original_text: {chunk['chunk_html']}")
    print(" ")

    for i in range(num_prompt_cycles):
        prompt_num = i+1
        print(f"Generation {prompt_num} for chunk {chunk['chunk_index']}")

        # Generate prompt info for the current iteration
        prompt_info = prompt_handler.generate_prompt(chunk['chunk_html'], prompt_num, ai_type, processed_responses)

        # Convert prompt_messages to the format expected by OpenAI API
        prompt_messages = prompt_info.get('prompt_messages',[{'user':''}])
        model = prompt_info.get('model', 'gpt-3.5-turbo-0125')
        temperature = prompt_info.get('temperature', 1)
        max_tokens = prompt_info.get('max_tokens', 4095)
        response_format = prompt_info.get('response_format', None)

        # Make OpenAI Request with the generated prompt info
        print(f'Making the OpenAI call for generation {prompt_num} with model: {model}, temperature:{temperature}, max_tokens:{max_tokens}, response_format:{response_format}, prompt_messages:{prompt_messages}')
        openai_message_content, processed_responses[prompt_num], openai_cost = retry_openai_request(
            prompt_messages, processed_responses, retries, max_wait_time, model, temperature, max_tokens, response_format, prompt_num, ai_type, prompt_handler)

        # Check for failure
        if openai_message_content == "failed":
            print(f"OpenAI request and processing failed after {retries} retries.")
            processed_responses[prompt_num] = [
                'Karten generieren für Teil der PDF fehlgeschlagen. Hier der Originaltext für den Teil: \n\n' + chunk['chunk_html']
            ]
            print("Processing failed. Exiting generation loop.")
            print(" ")
            break  # Exit the for loop if processing fails

        add_costs_to_session_and_user(session_id, openai_cost)

        # app_tables.ai_call_results.add_row(
        #     ai_type=ai_type,
        #     session=session,
        #     chunk=chunk,
        #     prompt_num=prompt_num,
        #     prompt_messages=prompt_messages,
        #     openai_message_content=openai_message_content,
        #     processed_response=processed_responses[prompt_num],
        #     openai_cost=openai_cost,
        #     created=current_date_time,
        #     server_session_id=anvil.server.get_session_id()
        # )

        print(f'Prompt {i+1}')
        print(json.dumps(processed_responses[prompt_num], indent=2, ensure_ascii=False))
        print(" ")

    end_time = time.time()
    duration = end_time - start_time
    print(f"generate_AI_lc_v2 task completed in {duration} seconds.")

    app_tables.temp_insertion_waiting_list.add_row(session_id = session_id, pdf_chunk = chunk, final_response = processed_responses[prompt_num], index = index, inserted=False)
    print(f'row with {len(processed_responses[prompt_num])} lcs added to temp table. session_id: {session_id}, index: {index}')

def add_lcs_to_db(session_id, chunk, processed_response):
    print(f'final response to add to db: {processed_response}')
    if not isinstance(processed_response, list):
        raise TypeError("processed_response must be a list")

    num_lcs = len(processed_response)
    print(f'Adding {num_lcs} learning cards to db for chunk {chunk["chunk_index"]}')
    rows_to_add = []
    total_size = 0
    i = -1
    MAX_SIZE = 3.6 * 1024 * 1024  # 3.6MB in bytes
    current_time = datetime.now()
    chunk_index = chunk['chunk_index']
    chunk_page_num = chunk['page_num']
    available_generation_credits, user, session = get_available_generation_credits(session_id)

    for i, lc in enumerate(processed_response):
        if available_generation_credits > 0:
            available_generation_credits -= 1
            if user:
                user['available_generation_credits'] -= 1
            else:
                #handled in assign_session_to_user
                pass

        else:
            # Set flag indicating the user has reached their generation limit
            session = app_tables.session_info.get_by_id(session_id)
            if session['show_generation_limit_reached_msg'] is None: session['show_generation_limit_reached_msg'] = True #only if not already set before
            print(f'No more generation credits available.')
            break

        row = {
            'learning_card': lc,
            'lc_page_num': chunk_page_num,
            'ai_generated': True,
            'session': session,
            'pdf_chunk': chunk,
            'lc_index': chunk_index*1000 + i,
            'created': current_time
        }

        row_without_extra_info = {k: v for k, v in row.items() if k not in ['created', 'session', 'pdf_chunk', 'ai_generated']}
        row_size = sys.getsizeof(json.dumps(row_without_extra_info))

        if total_size + row_size > MAX_SIZE:
            print(f'new_row_size: {row_size}, rows_to_add_so_far_size: {total_size}, together: {total_size + row_size} --> adding now')
            if len(rows_to_add) == 0:
                raise Exception("Single row exceeds maximum size of 3.8MB")
            app_tables.learning_cards.add_rows(rows_to_add)
            rows_to_add = [row]  # Start a new batch with current row
            total_size = row_size
        else:
            rows_to_add.append(row)
            total_size += row_size

    if rows_to_add:
        print(f'new_row_size: {row_size}, rows_to_add_so_far_size: {total_size}, together: {total_size + row_size} --> last rows, so adding')
        app_tables.learning_cards.add_rows(rows_to_add)

    session['num_lcs_added_to_lc_table'] += i+1
    print(f'Learning cards added to db for chunk {chunk["chunk_index"]}')

# Helper function that generates the openAI call and waits for a response, retrying a number of times.
def retry_openai_request(prompt_messages, processed_responses, retries, max_wait_time, model, temperature, max_tokens, response_format, prompt_num, ai_type, prompt_handler):
    best_answer = None
    for i in range(retries):
        # Launch the OpenAI API background task
        print(f'launching openai background task, try {i+1}')
        try:
            openai_message_content, openai_cost = openai_background_task_v2(prompt_messages, model, temperature, max_tokens, response_format)
            print('openai responded succesfully')
            status, response = prompt_handler.process_openai_response(processed_responses, openai_message_content, prompt_num, ai_type)
        except Exception as e:
            # If an exception occurs during response processing, log and prepare for a retry
            print(f'Error during response processing: {e}')
            continue

        if status == prompt_handler.STATUS_FAILURE_RETRY_OR_CONTINUE:
            # rudimentary, but guessing the longest response that didnt validate well is the best (most improvements e.g.)
            if best_answer is None or len(str(best_answer)) < len(str(response)): best_answer = response

            if i == (retries - 1):
                print('Fail with continue on retry and no more retries available, so continuing.')
                response = best_answer
            else:
                print('Response processing indicated retry, and more retries are available.')
                continue

        elif status == prompt_handler.STATUS_FAILURE_RETRY_OR_QUIT:
            # If the status is RETRY_OR_QUIT, regardless of the retry attempt, continue retrying
            print('Response processing indicated retry or quit.')
            continue

        print('response was processed succesfully')
        print(" ")
        return openai_message_content, response, openai_cost  # Return the successfully processed response

    print(f'Task failed {i+1} times.')
    return "failed", False, 0  # Return 'failed' after 3 retries (probably not getting charged 0 for failures, but who knows)

def openai_background_task_v2(prompt_messages, model, temperature, max_tokens, response_format):
    global client
    start_time = time.time()
    print('entered openai_background_task_v2')

    if not client:
        print('1')
        openAI_key = anvil.secrets.get_secret("openAI_key")
        client = OpenAI(api_key=openAI_key)
        print('2')

    if response_format:
        print('3.1')
        response = client.chat.completions.create(
            model=model,
            response_format=response_format,
            messages=prompt_messages,
            temperature=temperature,
            max_tokens=int(max_tokens),
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )
    else:
        print('3.2')
        response = client.completions.create(
            model=model,
            messages=prompt_messages,
            temperature=temperature,
            max_tokens=int(max_tokens),
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )

    print('4')
    openai_message_cotent = response.choices[0].message.content
    prompt_tokens = response.usage.prompt_tokens
    completion_tokens = response.usage.completion_tokens
    print('5')
    costs = calculate_cost(model, prompt_tokens, completion_tokens)
    print('6')
    time_taken = time.time()-start_time

    print(f'Completing API call took {time_taken} seconds')
    print(f"openai_message_cotent: {openai_message_cotent}")

    return openai_message_cotent, costs

def get_available_generation_credits(session_id):
    session = app_tables.session_info.get_by_id(session_id)
    user = session['user']

    if user:
        available_generation_credits = user['available_generation_credits']
    else:
        credits_used_this_session = session['num_lcs_added_to_lc_table']
        tot_available_generation_credits = max_lcs_to_generate(user)
        available_generation_credits = max_lcs_to_generate(user) - credits_used_this_session

    print (f"available_genartion_credits: {available_generation_credits}, user_email: {user['email'] if user else 'no user'}")
    return available_generation_credits, user, session

def calculate_cost(model, prompt_tokens, completion_tokens):
    if model == 'gpt-3.5-turbo-0125':
        input_per_mil = 0.5
        output_per_mil = 1.5
    elif model == 'gpt-4-0125-preview':
        input_per_mil = 10
        output_per_mil = 30
    else:
        raise ValueError(f"Model {model} not recognized")

    prompt_cost = prompt_tokens * input_per_mil / 1000000
    completion_cost = completion_tokens * output_per_mil / 1000000
    total_cost = prompt_cost + completion_cost
    return total_cost #in dollars

def add_costs_to_session_and_user(session_id, openai_cost):
    session = app_tables.session_info.get_by_id(session_id)
    user = session['user']
    if user:
        if user['ai_costs'] is None: user['ai_costs'] = 0
        user['ai_costs'] += openai_cost
    if session['ai_costs'] is None: session['ai_costs'] = 0
    session['ai_costs'] += openai_cost

def consecutively_insert_into_lc_db(session_id):
    temp_rows = app_tables.temp_insertion_waiting_list.search(tables.order_by('index'), session_id=session_id)

    for i, temp_row in enumerate(temp_rows):
        # we're inserting into the table after completion of background task openai_backgournd_task_v2. We want the insertions
        # to be in order, but we need to check that task 0 finished (otherwise we might insert task 2 and 3 before 0 and 1 have been added to the table)
        if temp_row['index'] != i:
            return

        if temp_row['inserted']:
            continue
        else:
            add_lcs_to_db(session_id, temp_row['pdf_chunk'], temp_row['final_response'])
            temp_row['inserted'] = True

def get_pdf_chunks(session_id, pdf_chunk_indeces):
    # pretty sure pdf_chunk_indeces is unnecessary as it is always all of them...
    session_row = app_tables.session_info.get_by_id(session_id)
    # Fetch pdf chunks by their IDs
    pdf_chunks = app_tables.pdf_chunks.search(
        q.fetch_only('chunk_html','session', 'chunk_index', 'page_num'),
        tables.order_by("chunk_index"),
        session = session_row,
        chunk_index= q.any_of(*pdf_chunk_indeces) #* unpacks the list into lc_index_1, lc_index2,...
    )
    return pdf_chunks

def remove_completed_active_tasks_from_list(active_tasks):
    new_active_tasks = []
    failed_task_chunk_indexes = []
    for(task, chunk_index) in active_tasks:
        if task.get_termination_status() == 'completed':
            print(f"Task for chunk {chunk_index} completed.")
        elif task.get_termination_status() in ("failed","killed","missing"):
            failed_task_chunk_indexes.append(chunk_index)
            print("ERROR: TASK FAILED WAS KILLED OR WENT MISSING")
            pass
        else:
            new_active_tasks.append((task, chunk_index))

    return new_active_tasks, failed_task_chunk_indexes

def retry_failed_tasks(failed_task_chunk_indexes, active_tasks, pdf_chunks, session_id, ai_type, retries, max_wait_time):
    for failed_task_chunk_index in failed_task_chunk_indexes:
        failed_chunk = pdf_chunks[failed_task_chunk_index]
        new_task = anvil.server.launch_background_task('generate_AI_lc_v2', session_id, failed_chunk, ai_type, retries, max_wait_time, failed_task_chunk_index)
        active_tasks.append((new_task, failed_chunk['chunk_index']))
        print(f'task failed, retries left, so re-added non-pic task to active tasks with chunk index: {failed_task_chunk_index}')
    return active_tasks

def handle_active_tasks(active_tasks, failed_tasks_counter, pdf_chunks, max_failed_tasks_before_full_fail, session_id, ai_type, retries, max_wait_time):
    active_tasks, failed_task_chunk_indexes = remove_completed_active_tasks_from_list(active_tasks)
    failed_tasks_counter += len(failed_task_chunk_indexes)
    if failed_tasks_counter > max_failed_tasks_before_full_fail:
        raise RuntimeError(f'Individual AI tasks failed too often. Failed: {failed_tasks_counter+len(failed_task_chunk_indexes)}, max allowed: {max_failed_tasks_before_full_fail}')
    else:
        active_tasks = retry_failed_tasks(failed_task_chunk_indexes, active_tasks, pdf_chunks, session_id, ai_type, retries, max_wait_time)
    return active_tasks, failed_tasks_counter

Is it reasonable that with 2GB of memory I can only run 15 simultaneous background tasks?

This hinges heavily on your definition of “reasonable”, and whether it agrees with tht of the Python ecosystem, but each background task is a separate Python process. (This is true even when using Persistent Server - it’s so that you can kill them individually if they go rogue, which isn’t something you can do to a thread). And if each one weighs in at ~133MB then 15 of them would add up to 2GB. If you’re loading hefty data at startup and/or big Python libraries, 133MB doesn’t sound impossible to me. You might want to try to optimise that (eg if you have global initialisation code in your Server Modules that allocates memory, maybe make it “lazy” so it only allocates first time it’s used, that sort of thing).

If the thing you want to parallelise is your OpenAI API calls rather than any local processing, then there’s an alternative you might consider, and that’s using Python’s async support. A little Googling suggests that OpenAI has an async-capable API these days, and async is supported in Anvil server code.