What I’m trying to do:
I am cutting up a PDF, turning it into html and sending chunks of up to 4000 chars of html text to the openAI API to analyse.
In order to get through the PDF faster I parallelise the process by creating a new background task for each of the openAI calls.
Unfortunately, I am running into memory issues. Meredydd has already been kind enough to set my per-instance memory allocation to 2GB (see our discussion here: Downlink Disconnected x8, servercode exited unexpectedly x3, connection failed x2 - #2). This allows me to increase the number of simultaneous background openAI tasks I run from 5 to 15, but above that it still fails.
I have introduced print statements and the failure always happens on this line:
response = client.chat.completions.create(
model=model,
response_format=response_format,
messages=prompt_messages,
temperature=temperature,
max_tokens=int(max_tokens),
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
Is it reasonable that with 2GB of memory I can only run 15 simultaneous background tasks? The PDF itself is 700KB large.
Does anyone have suggestions about how I can optimise this or what I should look into?
My code:
import anvil.email
import anvil.secrets
import anvil.server
from openai import OpenAI
from anvil import tables
import time
import sys
import json
from datetime import datetime
import anvil.tables.query as q
from anvil.tables import app_tables
from . import class_prompt_handler
from .stripe_fns import max_lcs_to_generate
client = None
prompt_cycle_count = {
'PDF_CHUNKS_TO_CLOZE': 1,
'FACTS_TO_CLOZE': 6,
'FACTS_TO_Q_AND_A': 7
}
@anvil.server.background_task
def generate_AI_lcs_v2(session_id, pdf_chunk_indices, ai_type, simultaneous_ai_tasks=1, retries=4, max_wait_time=45):
print(f"Entered generate_AI_lcs function with session_id: {session_id}, simultaneous_ai_tasks: {simultaneous_ai_tasks}, pd_chunk_indices: {pdf_chunk_indices}, retries={retries}, max_wait_time={max_wait_time}")
# Initialize variables
start_time = time.time()
active_tasks = []
generation_credits_empty = False
pdf_chunks = get_pdf_chunks(session_id, pdf_chunk_indices)
current_pdf_chunk_index = 0
num_pdf_chunks = len(pdf_chunks)
failed_tasks_counter = 0
max_failed_tasks_before_full_fail = 5
if num_pdf_chunks == 0:
print("No learning cards to process.")
return
print(f"Processing {num_pdf_chunks} lcs")
session = app_tables.session_info.get_by_id(session_id)
session['ai_generating'] = True
user = session['user']
if user:
print(f"generating lcs for session {session_id} and user {user['email']} for deck {session['lc_group']}")
else:
print(f"generating lcs for session {session_id} and not logged in user for deck {session['lc_group']}")
# Launch initial set of tasks
while len(active_tasks) < min(simultaneous_ai_tasks, num_pdf_chunks) and current_pdf_chunk_index < num_pdf_chunks:
chunk = pdf_chunks[current_pdf_chunk_index]
# do pics immediately.
if chunk['chunk_type'] == 'pic':
app_tables.temp_insertion_waiting_list.add_row(session_id=session_id, pdf_chunk=chunk, final_response=[chunk['chunk_html']], index=current_pdf_chunk_index, inserted=False)
print(f'initially added pic to temp table with chunk index: {current_pdf_chunk_index}')
else:
# non pics
task = anvil.server.launch_background_task('generate_AI_lc_v2', session_id, chunk, ai_type, retries, max_wait_time, current_pdf_chunk_index)
active_tasks.append((task, chunk['chunk_index'])) # Store task with its chunk index for reference
print(f'initially added non-pic task to active tasks with chunk index: {current_pdf_chunk_index}')
current_pdf_chunk_index += 1
# Monitor and manage tasks
while current_pdf_chunk_index < num_pdf_chunks and not generation_credits_empty:
time.sleep(60)
active_tasks, failed_tasks_counter = handle_active_tasks(active_tasks, failed_tasks_counter, pdf_chunks, max_failed_tasks_before_full_fail, session_id, ai_type, retries, max_wait_time)
consecutively_insert_into_lc_db(session_id)
# add new tasks if there's room
while len(active_tasks) < simultaneous_ai_tasks and current_pdf_chunk_index < num_pdf_chunks and not generation_credits_empty:
# Check if user still has credits, then launch a new task
available_generation_credits, user, _ = get_available_generation_credits(session_id)
print(f'available_generation_credits: {available_generation_credits}, user:{user}')
if available_generation_credits > 0:
new_chunk = pdf_chunks[current_pdf_chunk_index]
if new_chunk['chunk_type'] == 'pic':
app_tables.temp_insertion_waiting_list.add_row(session_id=session_id, pdf_chunk=new_chunk, final_response=[new_chunk['chunk_html']], index=current_pdf_chunk_index, inserted=False)
print(f'active task finished, so added pic to temp table with chunk index: {current_pdf_chunk_index}')
else:
new_task = anvil.server.launch_background_task('generate_AI_lc_v2', session_id, new_chunk, ai_type, retries, max_wait_time, current_pdf_chunk_index)
active_tasks.append((new_task, new_chunk['chunk_index']))
print(f'active task finished, so added non-pic task to active tasks with chunk index: {current_pdf_chunk_index}')
current_pdf_chunk_index += 1
else:
print(f"User {user} with session {session_id} has reached their learning card generation limit.")
generation_credits_empty = True # End this background task as the user has no credits left
# if there are no more credits or all the pdf chunks have been assigned a task, wait for all tasks to finish
while len(active_tasks) > 0:
active_tasks, failed_tasks_counter = handle_active_tasks(active_tasks, failed_tasks_counter, pdf_chunks, max_failed_tasks_before_full_fail, session_id, ai_type, retries, max_wait_time)
consecutively_insert_into_lc_db(session_id)
time.sleep(60)
# run the insertion function one last time and then empty the table after everything is done
consecutively_insert_into_lc_db(session_id)
temp_rows = app_tables.temp_insertion_waiting_list.search(session_id=session_id)
temp_rows.delete_all_rows()
print('all temp rows deleted')
session['ai_generating'] = False
end_time = time.time()
duration = end_time - start_time
print(f"generate_AI_lcs_v2 task completed in {duration} seconds")
print('generate_AI_lcs completed')
@anvil.server.background_task
def generate_AI_lc_v2(session_id, chunk, ai_type, retries, max_wait_time, index):
start_time = time.time()
current_date_time = datetime.now()
num_prompt_cycles = prompt_cycle_count.get(ai_type, 1)
prompt_handler = class_prompt_handler.PromptHandler()
processed_responses = [""] * (num_prompt_cycles+1)
processed_responses[0] = {"original_text": chunk['chunk_html']}
session = app_tables.session_info.get_by_id(session_id)
user = session['user']
if user:
print(f"task {index} generating lcs for session {session_id} and user {user['email']} for deck {session['lc_group']}")
else:
print(f"task {index} generating lcs for session {session_id} and not logged in user for deck {session['lc_group']}")
print(f"original_text: {chunk['chunk_html']}")
print(" ")
for i in range(num_prompt_cycles):
prompt_num = i+1
print(f"Generation {prompt_num} for chunk {chunk['chunk_index']}")
# Generate prompt info for the current iteration
prompt_info = prompt_handler.generate_prompt(chunk['chunk_html'], prompt_num, ai_type, processed_responses)
# Convert prompt_messages to the format expected by OpenAI API
prompt_messages = prompt_info.get('prompt_messages',[{'user':''}])
model = prompt_info.get('model', 'gpt-3.5-turbo-0125')
temperature = prompt_info.get('temperature', 1)
max_tokens = prompt_info.get('max_tokens', 4095)
response_format = prompt_info.get('response_format', None)
# Make OpenAI Request with the generated prompt info
print(f'Making the OpenAI call for generation {prompt_num} with model: {model}, temperature:{temperature}, max_tokens:{max_tokens}, response_format:{response_format}, prompt_messages:{prompt_messages}')
openai_message_content, processed_responses[prompt_num], openai_cost = retry_openai_request(
prompt_messages, processed_responses, retries, max_wait_time, model, temperature, max_tokens, response_format, prompt_num, ai_type, prompt_handler)
# Check for failure
if openai_message_content == "failed":
print(f"OpenAI request and processing failed after {retries} retries.")
processed_responses[prompt_num] = [
'Karten generieren für Teil der PDF fehlgeschlagen. Hier der Originaltext für den Teil: \n\n' + chunk['chunk_html']
]
print("Processing failed. Exiting generation loop.")
print(" ")
break # Exit the for loop if processing fails
add_costs_to_session_and_user(session_id, openai_cost)
# app_tables.ai_call_results.add_row(
# ai_type=ai_type,
# session=session,
# chunk=chunk,
# prompt_num=prompt_num,
# prompt_messages=prompt_messages,
# openai_message_content=openai_message_content,
# processed_response=processed_responses[prompt_num],
# openai_cost=openai_cost,
# created=current_date_time,
# server_session_id=anvil.server.get_session_id()
# )
print(f'Prompt {i+1}')
print(json.dumps(processed_responses[prompt_num], indent=2, ensure_ascii=False))
print(" ")
end_time = time.time()
duration = end_time - start_time
print(f"generate_AI_lc_v2 task completed in {duration} seconds.")
app_tables.temp_insertion_waiting_list.add_row(session_id = session_id, pdf_chunk = chunk, final_response = processed_responses[prompt_num], index = index, inserted=False)
print(f'row with {len(processed_responses[prompt_num])} lcs added to temp table. session_id: {session_id}, index: {index}')
def add_lcs_to_db(session_id, chunk, processed_response):
print(f'final response to add to db: {processed_response}')
if not isinstance(processed_response, list):
raise TypeError("processed_response must be a list")
num_lcs = len(processed_response)
print(f'Adding {num_lcs} learning cards to db for chunk {chunk["chunk_index"]}')
rows_to_add = []
total_size = 0
i = -1
MAX_SIZE = 3.6 * 1024 * 1024 # 3.6MB in bytes
current_time = datetime.now()
chunk_index = chunk['chunk_index']
chunk_page_num = chunk['page_num']
available_generation_credits, user, session = get_available_generation_credits(session_id)
for i, lc in enumerate(processed_response):
if available_generation_credits > 0:
available_generation_credits -= 1
if user:
user['available_generation_credits'] -= 1
else:
#handled in assign_session_to_user
pass
else:
# Set flag indicating the user has reached their generation limit
session = app_tables.session_info.get_by_id(session_id)
if session['show_generation_limit_reached_msg'] is None: session['show_generation_limit_reached_msg'] = True #only if not already set before
print(f'No more generation credits available.')
break
row = {
'learning_card': lc,
'lc_page_num': chunk_page_num,
'ai_generated': True,
'session': session,
'pdf_chunk': chunk,
'lc_index': chunk_index*1000 + i,
'created': current_time
}
row_without_extra_info = {k: v for k, v in row.items() if k not in ['created', 'session', 'pdf_chunk', 'ai_generated']}
row_size = sys.getsizeof(json.dumps(row_without_extra_info))
if total_size + row_size > MAX_SIZE:
print(f'new_row_size: {row_size}, rows_to_add_so_far_size: {total_size}, together: {total_size + row_size} --> adding now')
if len(rows_to_add) == 0:
raise Exception("Single row exceeds maximum size of 3.8MB")
app_tables.learning_cards.add_rows(rows_to_add)
rows_to_add = [row] # Start a new batch with current row
total_size = row_size
else:
rows_to_add.append(row)
total_size += row_size
if rows_to_add:
print(f'new_row_size: {row_size}, rows_to_add_so_far_size: {total_size}, together: {total_size + row_size} --> last rows, so adding')
app_tables.learning_cards.add_rows(rows_to_add)
session['num_lcs_added_to_lc_table'] += i+1
print(f'Learning cards added to db for chunk {chunk["chunk_index"]}')
# Helper function that generates the openAI call and waits for a response, retrying a number of times.
def retry_openai_request(prompt_messages, processed_responses, retries, max_wait_time, model, temperature, max_tokens, response_format, prompt_num, ai_type, prompt_handler):
best_answer = None
for i in range(retries):
# Launch the OpenAI API background task
print(f'launching openai background task, try {i+1}')
try:
openai_message_content, openai_cost = openai_background_task_v2(prompt_messages, model, temperature, max_tokens, response_format)
print('openai responded succesfully')
status, response = prompt_handler.process_openai_response(processed_responses, openai_message_content, prompt_num, ai_type)
except Exception as e:
# If an exception occurs during response processing, log and prepare for a retry
print(f'Error during response processing: {e}')
continue
if status == prompt_handler.STATUS_FAILURE_RETRY_OR_CONTINUE:
# rudimentary, but guessing the longest response that didnt validate well is the best (most improvements e.g.)
if best_answer is None or len(str(best_answer)) < len(str(response)): best_answer = response
if i == (retries - 1):
print('Fail with continue on retry and no more retries available, so continuing.')
response = best_answer
else:
print('Response processing indicated retry, and more retries are available.')
continue
elif status == prompt_handler.STATUS_FAILURE_RETRY_OR_QUIT:
# If the status is RETRY_OR_QUIT, regardless of the retry attempt, continue retrying
print('Response processing indicated retry or quit.')
continue
print('response was processed succesfully')
print(" ")
return openai_message_content, response, openai_cost # Return the successfully processed response
print(f'Task failed {i+1} times.')
return "failed", False, 0 # Return 'failed' after 3 retries (probably not getting charged 0 for failures, but who knows)
def openai_background_task_v2(prompt_messages, model, temperature, max_tokens, response_format):
global client
start_time = time.time()
print('entered openai_background_task_v2')
if not client:
print('1')
openAI_key = anvil.secrets.get_secret("openAI_key")
client = OpenAI(api_key=openAI_key)
print('2')
if response_format:
print('3.1')
response = client.chat.completions.create(
model=model,
response_format=response_format,
messages=prompt_messages,
temperature=temperature,
max_tokens=int(max_tokens),
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
else:
print('3.2')
response = client.completions.create(
model=model,
messages=prompt_messages,
temperature=temperature,
max_tokens=int(max_tokens),
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
print('4')
openai_message_cotent = response.choices[0].message.content
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
print('5')
costs = calculate_cost(model, prompt_tokens, completion_tokens)
print('6')
time_taken = time.time()-start_time
print(f'Completing API call took {time_taken} seconds')
print(f"openai_message_cotent: {openai_message_cotent}")
return openai_message_cotent, costs
def get_available_generation_credits(session_id):
session = app_tables.session_info.get_by_id(session_id)
user = session['user']
if user:
available_generation_credits = user['available_generation_credits']
else:
credits_used_this_session = session['num_lcs_added_to_lc_table']
tot_available_generation_credits = max_lcs_to_generate(user)
available_generation_credits = max_lcs_to_generate(user) - credits_used_this_session
print (f"available_genartion_credits: {available_generation_credits}, user_email: {user['email'] if user else 'no user'}")
return available_generation_credits, user, session
def calculate_cost(model, prompt_tokens, completion_tokens):
if model == 'gpt-3.5-turbo-0125':
input_per_mil = 0.5
output_per_mil = 1.5
elif model == 'gpt-4-0125-preview':
input_per_mil = 10
output_per_mil = 30
else:
raise ValueError(f"Model {model} not recognized")
prompt_cost = prompt_tokens * input_per_mil / 1000000
completion_cost = completion_tokens * output_per_mil / 1000000
total_cost = prompt_cost + completion_cost
return total_cost #in dollars
def add_costs_to_session_and_user(session_id, openai_cost):
session = app_tables.session_info.get_by_id(session_id)
user = session['user']
if user:
if user['ai_costs'] is None: user['ai_costs'] = 0
user['ai_costs'] += openai_cost
if session['ai_costs'] is None: session['ai_costs'] = 0
session['ai_costs'] += openai_cost
def consecutively_insert_into_lc_db(session_id):
temp_rows = app_tables.temp_insertion_waiting_list.search(tables.order_by('index'), session_id=session_id)
for i, temp_row in enumerate(temp_rows):
# we're inserting into the table after completion of background task openai_backgournd_task_v2. We want the insertions
# to be in order, but we need to check that task 0 finished (otherwise we might insert task 2 and 3 before 0 and 1 have been added to the table)
if temp_row['index'] != i:
return
if temp_row['inserted']:
continue
else:
add_lcs_to_db(session_id, temp_row['pdf_chunk'], temp_row['final_response'])
temp_row['inserted'] = True
def get_pdf_chunks(session_id, pdf_chunk_indeces):
# pretty sure pdf_chunk_indeces is unnecessary as it is always all of them...
session_row = app_tables.session_info.get_by_id(session_id)
# Fetch pdf chunks by their IDs
pdf_chunks = app_tables.pdf_chunks.search(
q.fetch_only('chunk_html','session', 'chunk_index', 'page_num'),
tables.order_by("chunk_index"),
session = session_row,
chunk_index= q.any_of(*pdf_chunk_indeces) #* unpacks the list into lc_index_1, lc_index2,...
)
return pdf_chunks
def remove_completed_active_tasks_from_list(active_tasks):
new_active_tasks = []
failed_task_chunk_indexes = []
for(task, chunk_index) in active_tasks:
if task.get_termination_status() == 'completed':
print(f"Task for chunk {chunk_index} completed.")
elif task.get_termination_status() in ("failed","killed","missing"):
failed_task_chunk_indexes.append(chunk_index)
print("ERROR: TASK FAILED WAS KILLED OR WENT MISSING")
pass
else:
new_active_tasks.append((task, chunk_index))
return new_active_tasks, failed_task_chunk_indexes
def retry_failed_tasks(failed_task_chunk_indexes, active_tasks, pdf_chunks, session_id, ai_type, retries, max_wait_time):
for failed_task_chunk_index in failed_task_chunk_indexes:
failed_chunk = pdf_chunks[failed_task_chunk_index]
new_task = anvil.server.launch_background_task('generate_AI_lc_v2', session_id, failed_chunk, ai_type, retries, max_wait_time, failed_task_chunk_index)
active_tasks.append((new_task, failed_chunk['chunk_index']))
print(f'task failed, retries left, so re-added non-pic task to active tasks with chunk index: {failed_task_chunk_index}')
return active_tasks
def handle_active_tasks(active_tasks, failed_tasks_counter, pdf_chunks, max_failed_tasks_before_full_fail, session_id, ai_type, retries, max_wait_time):
active_tasks, failed_task_chunk_indexes = remove_completed_active_tasks_from_list(active_tasks)
failed_tasks_counter += len(failed_task_chunk_indexes)
if failed_tasks_counter > max_failed_tasks_before_full_fail:
raise RuntimeError(f'Individual AI tasks failed too often. Failed: {failed_tasks_counter+len(failed_task_chunk_indexes)}, max allowed: {max_failed_tasks_before_full_fail}')
else:
active_tasks = retry_failed_tasks(failed_task_chunk_indexes, active_tasks, pdf_chunks, session_id, ai_type, retries, max_wait_time)
return active_tasks, failed_tasks_counter