A scalable, async OpenAI Assistant processor built with FastAPI (source code on Github)
(Full source code: https://github.com/jlvanhulst/fastapi-assistant)
After working with the Openai Assistants for 10 months and first building a Django/Celery based ‘processor’ I realized that the Django/Celery setup is in many ways ‘legacy’ and certainly more complicated to setup (and maintain). So I wrote a completely new version, inspired by some of the streaming chat/FastAPI implementations that has all the features my Django processor had, but now in a single Python file that can be added to any Python project and plays (very) well with FastAPI projects. Which also means it is super easy to quickly deploy on AWS Elastic Beanstalk.
This Assistant processor is meant for ‘task’ handling, not ‘chats’, it is not using any of the realtime features. You can read some of my other posts like the one about using OpenAI to process tasks from incoming emails.
Interally we run hundreds of these tasks daily, reading and rating incoming pitches, summarizing meeting transcripts etc.
Before getting into the more advanced parts lets start with the basics.
Setup an new project in the OpenAI backend (platform.openai.com) in your Organization settings. Use the dropdown on top left and make sure you’re in the new empty project. From the left sidebar pick API keys and create an API key for THIS project. You will need this key to access the Assistants, Threads and Runs that you will create in this project.
Now create a new Assistant, also from the left sidebar. Let’s call it ‘Joker’ and make it about telling joke. Like this:
Try it out in the playground (top right menu!) using the Playground button and make sure in the playground to pick the Assistant tab and the Assistant you just created. No other settings on the OpenAI end are needed at this point. :)
Time to install the Python project. Go to Github and clone the app. If you’re using Visual Studio (or better: Cursor!) — you can run and debug right away (make sure to create your venv environment (from the Palette → ‘Create environment’ → venv) Under ‘Run and debug’ you should have the FastAPI app up and runing!
(Make sure you have OPENAI_API_KEY definied somewhere)
It should now be up on 127.0.0.1:8000 and if you do 127.0.0.1:8000/demo/joke this function should be triggered:
@router.get("/joke", response_class=JSONResponse)
async def assistant_test():
"""
This is a test endpoint that can be used to test the assistant.
This is the simplest way to 'run' an Assistant. Get the assistant object, provide the name of the Assistant
('test' in this case) and the prompt.
"""
assistant = Assistant_call()
response = await assistant.newthread_and_run(assistant_name="Joke",
content="tell me a joke about sales people")
return response
This should return JSON that looks like this:
{
"response": "Why did the salesperson only sell cows?\n\nBecause they were really good at moooo-ving inventory!",
"status_code": 200,
"thread_id": "thread_bSmhm1zotS3NXr6PpObxQMsS"
}
First task completed (getting a joke from an Assistant, through a simple API call). If you did not create an Assistant with the name ‘Joke’ you will get an error ‘Assistant ‘Joke’ not found.
A slightly more advanced example that is already pretty powerful — provide the Assistant name in the url and provide the prompt in JSON.
@router.post("/assistant/{assistant_name}", response_class=JSONResponse)
async def run_assistant(assistant_name: str, data: AssistantRequest):
"""
A simple example endpoint that can be used to call any Assistant with a prompt. give it the name onf the Assisntant in the request and the {"content": "your prompt here"}
as the body of the request.
What it returns depends on the settings for that particular Assistant. This can be text or some json if the assistant is set to return json.
"""
assistant = Assistant_call()
return await assistant.newthread_and_run(assistant_name=assistant_name,
content=data.content,
tools=tools,
files=data.file_ids,
when_done=data.when_done,
metadata=data.metadata)
Using Postman we can call the ‘Joke’ Assistant, but this time we’ll also use the ‘when_done’ attribute to show how this app would work in a production environment that gets incoming Assistants requests, waits for the processing to be done and then calls a follow function to store those results or .. send them to the next Assistant :). This is why we also add a metadata attribute ‘my_id’
This triggers the Post /Assistant route and receives the Assistant name as the parameter and in the JSON body has the content attribute with the prompt (“tell me a joke”) and the when_done name of the run_after function that can be found in demo.py. Note: the when_done function MUST be defined as Async def! Because we added when_done the response contains the thread_id but not the Assistant response. You pick up the response from the Assistant in the when_done function:
async def run_after(thread_id:str=None):
'''
Demo function to show how to pick up after a trhead is done - and then use the results to be stored or further processed.
'''
print('Completion call')
assistant = Assistant_call()
thread = await assistant.get_thread(thread_id=thread_id)
print(thread.metadata)
response = await assistant.getfullresponse(thread_id=thread_id)
print(response)
You can see the metadata in the system console — or when you look up the thread in the OpenAI backend. (BTW — this is a huge advantage, IMO over using chat completions!) — Threads are persistent for 60 days and you can find them in the back end. And the built in metadata make a call back function easy. This demo run_after function just prints the result (andother bad joke — and the metadata. The real application would use the my_id to do some save in a database or another API call.
So this little FastAPI application is just constantly ready to run Assistant tasks (In parallel, you can fire a lot of requests, slightly depending on your billing tier with OpenAI.
This version also support File uploading in the demo
And you can add the file_id(s) that you get that way to your next call in the files=.. parameter.
assistant = Assistant_call()
return await assistant.newthread_and_run(assistant_name=assistant_name,
content=data.content,
tools=tools,
files=data.file_ids,
when_done=data.when_done,
metadata=data.metadata)
Lastly, function calling is also supported of course and I have included a ‘tools.py’ example file that shows how easy it can be to handle (a lot) of function calling. So my webscrape function example takes these parameters:
class WebScrapeParameters(BaseModel):
url: HttpUrl = Field(..., description="The URL of the website to scrape")
ignore_links: bool = Field(False, description="Ignore links in the text. Use 'False' to receive the URLs of nested pages to scrape.")
max_length: int = Field(None, description="Maximum length of the text to return")
BTW, ChatGPT is really convenient (or Cursor) to quickly create a Pydantic class from a function schema and vice versa: (Just saying, this kind of stuff saves so much time):
So now lets create a new, more useful assistant that can do a simple webscrape to research companies and returns JSON. Here’s what the Assistant looks like:
Note the FUNCTION that has been added with the following template (as generated by ChatGPT)
{
"name": "webscrape",
"description": "Get the text content of a webpage if 'ignore links' is true, links will be removed from the text",
"strict": false,
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL of the website to scrape"
},
"ignore_links": {
"type": "boolean",
"description": "Ignore links in the text. Use 'False' to receive the URLs of nested pages to scrape."
},
"max_length": {
"type": "integer",
"description": "Maximum length of the text to return"
}
},
"required": [
"url",
"ignore links"
],
"strict": "true"
}
}
To keep things simple, when we use Assistants that have function calling we provide a single python MODULE (ie ‘import tools’ that has EVERY function that might be called by THIS Assistant. In this case I provide it with ‘tools’ (tools.py) — and since this Assistant has only one function it knows to call (‘webscrape’) -the (async) function webscrape MUST be found in that module. It looks like this:
async def webscrape(plain_json ):
'''
This function is used to scrape a webpage.
It converts the html to text and returns the text.
Args:
plain_json (dict): The JSON data containing the URL to scrape. It is meant to be called as a tool call from an assistant.
the json should be in the format of {"url": "https://www.example.com", "ignore_links": False, "max_length": 1000}
Returns:
str: The text content of the webpage. If max_length is provided, the text will be truncated to the specified length.
'''
info = WebScrapeParameters(**plain_json)
header = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36'}
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(str(info.url), headers=header, timeout=5)
except Exception as e:
logging.error(f"Failed to fetch URL {info.url}: {e}")
return ""
logging.info('succesful webscrape '+str(info.url)+' '+str(response.status_code))
out = html_to_text(response.text,ignore_links=info.ignore_links)
if info.max_length:
return out[0:info.max_length]
else:
return out
And to run it from Postman it looks like this:
Notice that here I am asking to run the Assistant ‘Company Research Agent’ — I don’t want to wait for the results, because I have ‘when_done’ defined and I made up a metadata field that would help me store the information back.
(BTW I always LOVE looking at the threads and seeing what it decided to do! Notice that because I told it call the scrape more than once if needed, it did look at sailes.com/company and sailes.com/about-us to find the team info!)
And when it was done with all that — it calls the run_after function where we can see it compiled a beautiful JSON about Sailes.com.
If you want to have an Assistant call another Assistant you can easily create a wrapper — there is a an example in the demo.py that wraps the Company Research assistant. You can create a new generic Assistant and give it the Company Research function as a tool. One thing to keep in mind when doing that is that a Run will expire in 10 minutes — so that is the overall maximum for nested Assistants at the moment (at least in this implementation).
(Note: This code was inspired by Streaming Chat FastAPI implementation written by Meeran Malik and my own earlier Django OpenAI Assistant)
Full source code: https://github.com/jlvanhulst/fastapi-assistant