Building a scalable OpenAI Assistant Processor in Django with Celery

Jean-Luc Vanhulst
10 min readDec 2, 2023

--

Two weeks ago I wrote about my first experiments with the new OpenAI Assistants. At that point I had created a basic implementation inside our Django application and we now use two different assistants that evaluate incoming pitches here at Valor Ventures.

Update: Now available on pypi :https://pypi.org/project/django-openai-assistant/ please let me know your feedback!

In that first setup, a single assistant run, i.e. executing the following steps:

  • create message, thread and run
  • start run and check if it needs a function to run
  • waiting for completion

.. were all done in a single (Celery) task. So as much as it was exciting to have Celery/Redis up and running and be able to dump these assistant tasks on a queue it was clearly not the final solution because 30 seconds is STILL the ultimately limit you’re stuck with on Heroku for a single request. And the more I play with these Assistants the more their potential becomes clear. These assistants can be ‘running’ for hours, or days — if only because they might be waiting for something to happen (Like an answer to an email they send out — or the result from another Assistant they called).

So this week I spend a lot of time reworking my OpenAI Assistant processor into a truly distributed, asynchronous task processor.

The solution uses one database table to store each run, to facilitate truly long lived runs that can be restarted and post completion tasks started.

Ultimately it came out surprisingly short :)

Mapping OpenAI Function names with Python Functions

We use a lot of different Functions for our Assistants. I have webScrape function, a getCompany do lookup data in Salesforce and Pitchbook, a savePerson and getPerson to add and lookup Contacts. (So you can the assistant to lookup a person on Pitchbook and then, when found say ‘add this person to Salesforce). I expect this list to keep growing.

Each OpenAI function only has a ‘name’ so we need a kind of mapping the function name with a Python name, that is unless you want to simply hardcode it along the lines of if functionName=’webscrape’:

In the first iteration I put all possible functions in a ‘tools.py’ so that calling was easy — but in this new iteration I went for a more generic mapping, using importlib. When calling an assistant you provide a mapping array, that has the simple function name and the mapped function name as <module>:<function>. So my openAI webscrape function maps to a function webScrape in my local module websrape.py. So [“.webscrape:webScrape”]. I can add my Pitchbook find company function and the array becomes: [“.webscrape:webScrape”,”.pitchbook:companyFind”]

(Note that the function name inside the module still needs to exactly map with the OpenAI name, but the module name can be anything inside your project, importlib will handing loading.)

Here’s an example of how I let OpenAI scrape a company website and check if there is information on Pitchbook about the company and combine that information with the pitch that we received to write an assessment of how well the company matches with Valor’s investment criterea:

# sf is a record we get straight from Salesforce. It will have more fields than shown here
# using the simple-salesforce library
sf = { 'Id':'xxxxx', 'Name':'Test Company', 'Website':'https://someurl.com',
'Description': 'The elevator pitch of this company'}

# the openAI Vicbot Assistant is defined with two functions
# websScrape and personFind
# in this implementation, webScrape is a function inside webscrape.py
# and personFind lives inside pitchbook.py
task = assistantTask(assistantName='Vicbot',
tools= [ ".webscrape:webScrape",".pitchbook:personFind", ".pitchbook:companyFind"],
metadata = {"id":sf['Id'], "sobject": "account" },
completionCall = ".demo:updateApplication"
)
# note that the real instructions for this task are defined in the Assistant
# more than a page long :) The prompt here is just to give the data for the
# company to work on. We give it a JSON dump of all the available information
# that includes the website, which it will use to call the webscrape function
#
task.prompt =("Write an assesment for this company: "+sf['Name']+" Submission: "+json.dumps(sf))[:32760]
ret = task.createRun()

So we can start and run, fully asynchronous an OpenAI assistant session with 3 lines of code :)

Let’s see how this is achieved.

Note: I assume you already have Celery (and some backend, I use Redis) installed in Django — if not you’ll find plenty resources for that and you should probably practice a bit with delayed and async tasks first.

We need one database model to keep track of tasks we execute. I realize that this could theoritically all be done with Celery tasks — but I like to have a little bit more control about what to store and also have history to use later in learning / finetuning.

class OpenaiTask(models.Model):
assistantId = models.CharField(max_length=64)
runId = models.CharField(primary_key=True,max_length=64)
threadId = models.CharField(max_length=64)
status = models.CharField(max_length=64, default='created')
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True)
response = models.TextField(null=True)
completionCall = models.TextField(null=True)
tools = models.TextField(null=True)
def __str__(self):
return f'{self.runId}: {self.status}'

I use runID as the unique key — it is the run_id we get from OpenAI. I also apologize for introducing another term ‘Task’ — while OpenAI is using Run and Thread as their terminology. I consider an OpenAI run a ‘Task’. So each openAI run has a record in my OpenAITask table.

Note: you will see in the code in two places I use a ‘combo id’ — the concatenation of run_id and thread_id because for some reason openAI needs both to retrieve a run.

While the OpenaiTask is the persistent database version of the task — assistantTask is the class that is used to get it all going:

class assistantTask():
''' A class to manage an OpenAI assistant task

Basic use:

task = assistantTask(assistantName='DemoAssistant',
prompt='Summarize the content on this website: https://valor.vc',
completionCall='demo:printResult'), metadata={'ie':'vic123'}) )

task.createRun()

'''
@property
def prompt(self) -> str:
return self._startPrompt

@property
def task_id(self) ->str:
return self._task_id

@property
def assistant_id(self) -> str:
return self.assistantObject.id

@property
def thread_id(self) -> str:
return self.threadObject.id

@property
def metadata(self) -> dict:
return self.runObject.metadata

@prompt.setter
def prompt(self, message):
self._startPrompt = message
self.threadObject = self.client.beta.threads.create(messages=[] )
self.client.beta.threads.messages.create(
thread_id=self.thread_id,
content=message,
role="user"
)

def __init__(self, **kwargs ):
''' Create an assistant task

Two modes: a new taks or retrieve an existing (probably completed) task from the database

if run_id is provided it will retrieve the task from the database. If not it will create a new task.

'''
self.client = getOpenaiClient()
self._startPrompt = None
self.threadObject = None
self.runObject = None
self.messages = None
self.status = None
self.response = None
self.error = None
self.completionCall = None
self._metadata = None
self.tools = None
for key, value in kwargs.items():
if key == 'run_id':
self.task= OpenaiTask.objects.get(runId=value)
if self.task != None:
self.runObject = self.client.beta.threads.runs.retrieve(run_id=self.task.runId, thread_id=self.task.threadId)
if self.runObject == None:
raise Exception('Run not found')
else:
self.status = self.runObject.status
self.response = self.task.response
self.threadObject = self.client.beta.threads.retrieve(thread_id=self.task.threadId)
self._metadata = self.runObject.metadata
elif key == 'assistantName':
self.assistantName = value
self.assistantObject = getAssistant(name=self.assistantName)
if self.assistantObject == None:
raise Exception('Assistant '+self.assistantName+' not found in openAI call')
elif key == 'prompt':
self.prompt = value
elif key == 'completionCall':
self.completionCall = value
elif key == 'metadata':
self._metadata = value
elif key == 'tools':
self.tools = value

def createRun(self) -> str:
''' Create an OpenAI run and start checking the status

This this will persist the task in the database - run id is the primary key. Please note that openAI needs both ThreadId and RunId
to retrieve a Run. We handle that in the so that you will only need run id. The primary key in the Taks table is the run id.

'''
# Excepts an openAiTask object
run = self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id= self.assistant_id,
metadata=self._metadata
)
self.task = OpenaiTask.objects.create( assistantId=self.assistant_id, runId=run.id, threadId=self.thread_id, completionCall=self.completionCall, tools=",".join(self.tools) )
self.task.save()
# start the delayed status checking
getStatus.delay(self.task.runId+','+self.task.threadId)
return run.id

def jsonResponse(self):
''' Try to convert the openai response to Json. When we ask openai explicitly for json it will return a string with json in it.
This function will try to convert that string to json. If it fails it will return None
OpenAI often insists on added 'json' at the begining and the triple quotes
'''
if self.response != None:
res = self.response.replace('json','').replace("```",'')
try:
return json.loads(res)
except:
return None

def markdownResponse(self):
''' returns the response with markdown formatting - convient for rendering in chat like responses
'''
if self.response != None:
extension_configs = {
'markdown_link_attr_modifier': {
'new_tab': 'on',
'no_referrer': 'external_only',
'auto_title': 'on',
}}
return markdown.markdown(self.response,extensions=['tables','markdown_link_attr_modifier'],extension_configs=extension_configs)

def uploadFile(self,fileContent):
''' Upload a file to the assistant
'''
uploadFile = self.client.files.create( file=fileContent.read(),purpose='assistants')
af = self.client.beta.assistants.files.create(assistant_id=self.assistant_id, file_id=uploadFile.id)
return af.id

There are two ways to create an instance:

  1. A new task is create, like the example from above.
  2. An existing taks is retrieved using the taskID (which is the same is the run_id from the OpenAI run). This you will need in the functions that are called after the task is completed, to ‘bring the task back to life’. Remember the function that creates the run cannot wait for the run to complete to do the things you want to do with the results.

Since this is a Celery/Async implementation you will notice that the class only contains methods that are not asynchronous.

The real fun starts with the getStatus.delay() call inside createRun(). (And notice that this is one of those functions that receives the native run/thread id’s from the openAI calls as a comma separated string.)



def getOpenaiClient():
return OpenAI( api_key=settings.OPENAI_API_KEY)


@shared_task(name="Get Run Status", retry_policy={'max_retries': 100, "interval_start":2})
def getStatus(comboId):
''' Get the status of an OpenAI run and act on it

parameters:
comboId - a runId,threadId combo in a string
'''
run_id = comboId.split(',')[0]
thread_id = comboId.split(',')[1]
# Expects a runid,threadid combo in a string
client = getOpenaiClient()
run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
if run.status == 'completed' or run.status == 'requires_action':
# we only need the record info if we need to do something...
task = OpenaiTask.objects.get(runId=run_id)
if task==None:
raise Exception('Run '+run_id+' not found in task table')

if run.status == 'completed':
# openAI is done. Let's get the response and save it.
messagages = client.beta.threads.messages.list( thread_id=task.threadId)
task.response = messagages.data[0].content[0].text.value.strip()
task.status = run.status
task.completed_at = timezone.now()
task.save()
if task.completionCall != None:
# did we define a function that needs to be called after completion?
# in that case lets call it
module = task.completionCall.split(':')[0]
function = task.completionCall.split(':')[1]
module = importlib.import_module(module,package="chatbot")
function = getattr(module, function)
function.delay(run_id)
elif run.status == 'requires_action':
tool_calls = run.required_action.submit_tool_outputs.tool_calls
tools = task.tools.split(',')
callToolsDelay(comboId,tool_calls,tools)
task.status = run.status
task.save()
#getStatus.retry(countdown=1, max_retries=100)
# no retry here - the calltoolsday will run all tools and then restart a getstatus()
elif run.status == 'expired':
task = OpenaiTask.objects.get(runId=run_id)
task.status = run.status
task.save()
else:
# openAI is still working on an answer - retry in one second
getStatus.retry(countdown=1, max_retries=100)
return task.response

Let’s see what happens. createRun() puts getStatus() in the queue so the first time it gets called we’re probably still waiting for openAI to comeback with an answer. This is the ‘else’ situation at the bottom. The run is not ‘completed’ and no function calls yet decided. It queues a retry (of itself) in 1 second. (I have yet to decide what is the best number here — I think it could be more than 1 second)

Now in most cases openAI will decide that one or more of my functions will need to called. That is the scenario where run.status == ‘requires_action’ happens. callToolsDelay() is where we handle putting all the function calls in a Celery Chord:

def _getf(functionName):
# get a callable function from a string in the form module:function
# note you will need to replace the MY_DJANGO_PROJECT_NAME with the name
# of the folder you put this file. (Ie the Project name)
module = functionName.split(':')[0]
function = functionName.split(':')[1]
print('loading tool '+functionName)
module = importlib.import_module(module,package=MY_DJANGO_PROJECT_NAME)
f = None
try:
f = getattr(module, function)
except:
pass
return f

def getTools(array,value=None):
# array is the array used when the assistantTask() is created. It should look
# like this: [ ".webscrape:webScrape",".pitchbook:personFind", ".pitchbook:companyFind"]
# returns a dict in form that we can share with OpenAI
tools = []
for a in array:
f = _getf(a)
if f!=None:
if not value==None and value in a:
return a
tools.append( {"type": "function","function" : f()})
return tools

def callToolsDelay(comboId,tool_calls,tools):
# NOTE receives COMBO ID of thread/run
# the trick here is that we don't have to make the individual functions
# 'celery tasks' - we simply wrap them in a task called _callTool()
# the chord combines the calls - and adds the next task which is submitting
# the resutls back to openAI.
tasks = []
gr = None
for t in tool_calls:
functionCall = getTools(tools,t.function.name)
tasks.append( _callTool.s( {"tool_call_id": t.id,"function": t.function.name, "arguments" :t.function.arguments, "function_call": functionCall} , comboId) )

if len(tasks)>0:
gr = chord(tasks,submitToolOutputs.s(comboId) )
gr.delay()
return gr

@shared_task(name="call single tool")
def _callTool(tool_call,comboId=None):
# a asyncrhonous wrapper for calling a single worker function.
functionName = tool_call['function'] # function to call
try:
attributes = json.loads(tool_call['arguments'])
call = _getf(tool_call['function_call'])
functionResponse =call(attributes)
except Exception as e:
functionResponse = { "status" : 'Error in function call '+functionName+'('+tool_call['arguments']+')' }
tool_output = { "tool_call_id": tool_call['tool_call_id'] , "output": json.dumps(functionResponse) }
return {"comboId" : comboId , "output":tool_output }

The chord will first run all the function tasks and when those are done, it will call the submitToolOutputs function. Note that some of the parameter passing seems a bit burdensome but that is one of the things you’ll end up doing with Celery tasks. Any suggestions for improvement are welcome! The good thing is that that function calls are all handled asynchronous and once completed the function to submit them is called.

@shared_task(name="Submit Tool Outputs")
def submitToolOutputs(fromTools, comboId):
run_id = comboId.split(',')[0]
thread_id = comboId.split(',')[1]
client = getOpenaiClient()
output = []
for o in fromTools:
output.append( o["output"])
try:
run = client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=run_id,
tool_outputs=output
)
except Exception as e:
return comboId
getStatus.delay(comboId)
return comboId

This is again a async function that collects the results from the grouped calls to the function and then submits them with the submit_toll_outputs() call. And then … a schedules a task to run getStatus() again to start waiting for the run to complete!

if run.status == 'completed':
# openAI is done. Let's get the response and save it.
messagages = client.beta.threads.messages.list( thread_id=task.threadId)
task.response = messagages.data[0].content[0].text.value.strip()
task.status = run.status
task.completed_at = timezone.now()
task.save()
if task.completionCall != None:
# did we define a function that needs to be called after completion?
# in that case lets call it
module = task.completionCall.split(':')[0]
function = task.completionCall.split(':')[1]
module = importlib.import_module(module,package="chatbot")
function = getattr(module, function)
function.delay(run_id)

This fragement for getStatus() shows how we handle the result. I mark the database record complete and save the response. The last thing to do now is to start the completion task we did define one when the run was created. For the completion call we use the syntax to define what needs to be called. In the example from the beginning of the article we had it as completionCall = “.demo:updateApplication”
This function gets called when the Assistant is completely done. It will receive the taskID (which, if you remember is the same as the run_id). So with that id — it can store the results from this assistant back in the Salesforce record that triggered this task to be called. Note that I saved the record ID in the openAI metadata. I could just as well have opted to create a metadata field in the Django table obviously, but for my future pursuits I think I will be using these metadata inside the tasks as well. My prompt is calling for OpenAI to return a JSON with the elements Reasoing and Assessment and I check for those and then update the record in Salesforce with those data.

@shared_task(Name="Update the Salesforce record with the results")
def updateRunwayApplication(taskID):
task = assistantTask(run_id = taskID)
sfID = task.metadata["id"]
response = task.jsonResponse()
if not response== None and "Assessment" in response and "Reasoning" in response:
SF.__getattr__(task.metadata['sobject']).update(sfID,{"VicBot_Assessment__c":response['Assessment'],"VicBot_Reasoning__c":response['Reasoning']})

So there you have it — curious to hear feedback on this and how others have solved this. I’m looking to publish this as a module, when I have some spare cycles.

--

--

Jean-Luc Vanhulst

Valor.vc / Software company founder / Mostly Python these days with OpenAI and a little (Salesforce) Apex