How to stream data from server to client?

What I’m trying to do:
On the server I am using langchain and openai. As the full completion of a chat request takes quite some time (>20 seconds), I have added a BaseCallbackHandler that returns fragments of the answer ‘in completion’.
Now I want to stream these fragments to the client side so the enduser will see how the answer gets completed. The response of the interaction in this way is about 3 seconds instead of 20.

What I’ve tried and what’s not working:
I have tried to implement this using the call_async as described in:
https://anvil-labs.readthedocs.io/en/latest/guides/modules/non_blocking.html#

However, then I get the following error:
InvalidRequestError: [{}] is not valid under any of the given schemas - ‘input’
at /home/anvil/.env/lib/python3.10/site-packages/openai/api_requestor.py:687

What is the best way to get the data streamed from the server to the client? Is there any example available?

Thanks for your help.

2 Likes

I would add a Timer on the form that calls a server function every 1-2 seconds to get the most updated info and updates the UI.

Background tasks are designed for this. You cannot technically stream anything from the server to the client, but using a Timer as Stefano suggests and polling the status of the background task is the way to do it in Anvil.

Thus far, I’m aware of two reasons for “streaming” to the client:

  1. you want the latest version of a small amount of data, as soon as it’s updated (a latency issue)
  2. you have something very large to transmit, and you don’t want the UI to stall while waiting for it (a blocking issue).

I suppose you could have both at the same time.

It’s not clear which case you have. Different solutions may apply in each case.

I would go so far as saying this specific use case, of streaming small amounts of text, would be a great use case for putting the information directly into the anvil.server.task_state object contained in a background task.
I would organize each ‘chunk’ of text into ordinal keys as they were generated and have a timer component rebuild the string on the client side for display.

1 Like

Thanks for your help so far. Unfortunately, I do not see the solution yet. Let me try to explain.
In the client form I have the following server call:

self.openai_task = anvil.server.call('getresponse', prompt)

Then on the server side:

myresult = ""

class MyCustomHandler(BaseCallbackHandler):
  def on_llm_new_token(self, token: str, **kwargs) -> None:
    global myresult
    print(f"{token}")
    myresult += token

@authenticated_callable
def getresponse(prompt, type):
  task = anvil.server.launch_background_task('do_openai_processing', prompt)
  return task

@anvil.server.background_task
def do_openai_processing(prompt, type):
  global myresult
  myresult = ""
  llm = ChatOpenAI(
    model_name="gpt-3.5-turbo",
    temperature=0,
    openai_api_key=oakey,
    max_tokens=512,
    streaming=True,
    callbacks=[MyCustomHandler()]
  )

  role_template = """some description here.
    {context}    
    {question}"""
  
  ROLE_PROMPT = PromptTemplate(
      template=role_template, input_variables=["context", "question"]
  )
  
  role_ai = RetrievalQA.from_chain_type(
      llm=llm,
      chain_type="stuff",
      retriever=vectorstore.as_retriever(),
      chain_type_kwargs={"prompt": ROLE_PROMPT},
      return_source_documents=True
  )
  
  tools = [
        Tool(
            name="role",
            func=role_ai.run,
            description="""some description""",
        )
    ]

  agent = initialize_agent(
      tools,
      llm,
      agent=None,
      verbose=True
  )
  
  antw = role_ai({"query":prompt})
  return antw['result']

I want to get the intermediate results from the llm output (MyCustomHandler) into the client in order to present these to the enduser. These results will start coming in after around 3 seconds instead of waiting for the full result which will be returned by

antw = role_ai({"query":prompt})

which will take somewhere around 30 seconds.

I do not see how/where I can put a get_state() method for driving the task state back to the client.

Your expertise and help is highly appreciated. Thanks very much in advance.

I have also tried the following.
The client code:

  def handle_result(self, result):
    print(result)

  def submitbtn_click(self, **event_args):
    # Start background task
    openai_bgtask = anvil.server.call('get_intermediate_response')

    # Perform openai request
    result_hndl = call_async('getresponse', "Some prompt")
    result_hndl.on_result(self.handle_result)

    # Read state and intermediate result from background task
    task_state = True
    while task_state:
      self.response.text = openai_bgtask.get_state(['result'])
      sleep(0.5)
      task_state = openai_bgtask.get_state(['status'])

    openai_bgtask.kill()

On the server side there is the following code:

llmresult = ""
llmstate = True

class MyCustomHandler(BaseCallbackHandler):
  def on_llm_new_token(self, token: str, **kwargs) -> None:
    global llmresult
    llmresult += token

  def on_llm_end(self, response, **kwargs) -> None:
    global llm_state
    llm_state = False


@authenticated_callable
def get_intermediate_response():
  task = anvil.server.launch_background_task('read_llm_output')
  return task


@anvil.server.background_task
def read_llm_output():
  global llmresult
  global llmstate

  llmstate = True
  anvil.server.task_state['status'] = llmstate

  while llmstate:
    anvil.server.task_state['result'] = llmresult
    sleep(0.1)

  anvil.server.task_state['status'] = llmstate

When I do some debugging the global variable llmresult is assigned with the text coming from the on_llm_new_token event.
But the background task does not see this global variable.
Also

self.response.text = openai_bgtask.get_state(['result'])

is blocking and only returns when the process from:

result_hndl = call_async('getresponse', "Some prompt")

is completed.

I tried it with a Background task but am facing issues. See my reply: How to stream data from server to client? - #7 by wim.vandebrug

What am I doing wrong here?

If you’re having trouble with the task state, a common workaround is to put task state into a data table row (use the task id as the key in that table).

Thanks for your reply.
I am trying to get this working but am failing over and over again (I am an Anvil newbe…).
Please could you help me further? Or point me at some examples?

I have created a datatable: data_queue with fields taskid(text) and response(text).
Then on the server side:

taskid = str(uuid.uuid4())
llmresult = ""
app_tables.data_queue.add_row(taskid=taskid, response=llmresult)

class MyCustomHandler(BaseCallbackHandler):
  def on_llm_new_token(self, token: str, **kwargs) -> None:
    global taskid
    response_row = app_tables.data_queue.get(taskid=taskid)
    response_row['response'] += str(token)
    app_tables.data_queue.add_row(taskid=taskid, response=response_row['response'])
    print(f"{token}")

  def on_llm_end(self, response, **kwargs) -> None:
    global taskid
    response_row = app_tables.data_queue.get(taskid=taskid)
    response_row['response'] += "LLMFINISHED"
    app_tables.data_queue.add_row(taskid=taskid, response=response_row['response'])

and on the client side:

  def handle_result(self, result):
    self.response.text = "Instructie over: " + self.prompt.text + "\n" + result
    self.prompt.text = ""

  def submitbtn_click(self, **event_args)
    result_hndl = call_async('getresponse', "some prompt text")
    result_hndl.on_result(self.handle_result)

    openai_taskid = anvil.server.call('get_taskid')
    print("TaskID: ", openai_taskid)

    task_state = True
    while task_state:
      llmresponse = app_tables.data_queue.get(taskid=openai_taskid)
      print("Response from llm: ", llmresponse['response'])
      if llmresponse['response'].endswith('LLMFINISHED'):
        task_state = False
      else:
        self.response.text = llmresponse
        sleep(0.5)

Now I am getting the errors:

  1. Error in on_llm_new_token callback: More than one row matched this query
  2. anvil.server.InternalError: Permission denied: Cannot search in table ‘data_queue’ from client code.

Having this globally will add a new row every time any server function is executed. You probably want to do that just before you start the background task, and then pass the task id to the background task so it can pass it to the class.

You have to set the permissions on the data table to allow searching from the client. That’s disabled by default for security reasons. You do that on the data table UI in the IDE.

1 Like

Hi,
I finally made it following your guides. Thanks very much.

1 Like

I got it to work using the task state without having to use the data tables. I just put most of the server code, including the definition of the custom callback handler, inside the background task. The callback handler then tweaks the task state directly. I then interact with the task state from the client side.

The streaming is alright - not as smooth as what I see in the command line but it is good enough.

Example on the server:

@anvil.server.background_task
def ask_model_new(query, mchat_history=[]):
    anvil.server.task_state['response'] = ''
    class MyTaskHandler(BaseCallbackHandler):
        def __init__(self):
            self.tokens_stream = ""
    
        def on_llm_new_token(self, token: str, **kwargs) -> None:
            self.tokens_stream += token
            anvil.server.task_state['response'] = str(self.tokens_stream)

Example on client (in the button click event):

with anvil.server.no_loading_indicator:
      # this is a server function that launches the background task and returns the task object
      self.task = anvil.server.call('send_msg_call', self.new_message_box.text, self.mchat_history)
      self.new_message_box.text = ""
      while not self.task.is_completed():
          # print(self.task.get_state())
          if 'response' in self.task.get_state():
              self.conversation[-1]['text'] = self.task.get_state()['response']
          self.refresh_conversation()
4 Likes

Ah, this works like a charm. Thanks very much for sharing your expertise!

1 Like

Is there a complete demo somewhere that you know of?
I am kinda new here (Anvil) and this kinda makes sense but not enough for implementation.
Thanks!

I don’t have something I can share but this would be a GREAT tutorial idea for the Anvil team! They already have a demo for a “Christmas Chatbot” which uses a Hugging Face model. Adding langchain on the server side would be a quick win. Streaming via a background task state would be the cherry on top.

1 Like

I don’t have something to share either. Am still experimenting. Anyway have Langchain running on the server side and using the code samples of @yahiakalabs. These work very well!
When I have some code to share available I will post it.

I will say that, although this is ‘resolved’, it takes about 10 seconds for my response data to start streaming in. So I went down a rabbit hole of testing…

Loading the server packages is expensive (4-6 seconds) and when you launch a background task, there’s no getting around that even if you have enabled ‘persistent server’. Background tasks always launch a new process (hopefully I’m saying that right). Running the same code without a background task and with a persistent server (just printing the stream to the server console) is very fast - the stream starts almost instantly once the server is running.

I suppose using the data table streaming method with persistent server (using call_async and avoiding background tasks) would have the best performance and keep your secret keys on the server side.

1 Like

Out of curiosity, how did you make it work with the db streaming method? My database query would not return a result until the result of call_async had returned (which defeats the purpose). Almost like the query was being blocked by the running server call. I am not using a background task in this experiment.