A Gmail answering OpenAI Assistant built in Django
We have been using the OpenAI Assistants for several months now, for a lot of different tasks, including a Bot version. But in our ‘corporate’ setup it seems that people are not too keen on actually using the bot, probably because it is ‘yet another app to remember’. We also have a lot tasks that originate from email so it all but made sense to implement an email answering version of the Vic, or OpenAI Assistant.
Since we are on Django and using GMAIL this is all pretty straightforward:
- A timed Celery Beat task is checking for new Emails every 5 minutes
- When a new email is found, we mark it unread and create a Assistant thread / run with the body of the email and any attachments automatically added to the thread.
- My OpenAI Email answering Assistant reads / processes the email and will call any of my functions depending on the tasks it discovers in the email. It will return the email answer to send.
Favorite tasks here at Valor include ‘add this company to Salesforce’ or ‘add this email as a an Update to company Xyz - Once the run is completed we create an email reply in the thread with the answer the Assistant wrote.
In my previous article I described the Django/Celery infrastructure I built to easily run concurrent OpenAI Assistants in Django. You can download it and use it in your Django app right away.
I made a few changes since then — mostly I decided to keep metadata in my OpenaiTask() model and not use the native metadata in the OpenAI thread/run because only single key-value pairs are allowed there while in the Django model I can store a ‘regular’ JSON object.
class OpenaiTask(models.Model):
assistantId = models.CharField(max_length=64)
runId = models.CharField(primary_key=True,max_length=64)
threadId = models.CharField(max_length=64)
status = models.CharField(max_length=64, default='created')
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True)
response = models.TextField(null=True)
completionCall = models.TextField(null=True)
tools = models.TextField(null=True)
meta = models.JSONField(null=True)
def __str__(self):
return f'{self.runId}: {self.status}'
Using Thread/Run ID I can save metadata about the task (like the email message ID) and retrieve those at the end when the task is done.
Let’s take a look at some of the code:
The key function is called by the Beat timer and picks up exactly one unread email to process:
@shared_task(Name="Read Email", max_retries=0)
def getUnreadEmail():
# process a single unread email
user_id = "bot@yourdomain.vc"
credits = _getcredentials().with_subject(user_id)
service = build('gmail', 'v1', credentials=credits)
unread_msgs = service.users().messages().list(userId=user_id,labelIds=["INBOX","UNREAD"]).execute()
if unread_msgs['resultSizeEstimate'] == 0: # there are no unread message
return None
# we're doing only one message at a time to prevent double processing
mssg = unread_msgs['messages'][0]
email = Email(user_id, mssg['id'])
# a few meta data to save mow needed after processing
meta = {"msg_id":email.id,
'thread_id': email.thread_id,
"from": email.from_email,
"subject":email.subject , "files":[] }
# create a new task (thread)
task = assistantTask(assistantName="Vicbot Email Responder",
tools= [ ..your functions go here... ],
metadata = meta, completionCall = "chatbot.google:replyToEmail")
# process the eail
email.process_parts(task,meta) # we pass the meta data to add info about attachment
task.metadata = meta # update the metadata
task.prompt = "Process this email. Subject:"+email.subject+" From:"+email.from_email+" Date:"+email.msg_date+" Message body:"+email.body
markread(task.metadata['msg_id'])
return task.createRun() # the will start the task on the Celery worker
def markread(msg_id, user_id ="vic@valor.vc"):
credits = _getcredentials().with_subject(user_id)
service = build('gmail', 'v1', credentials=credits)
try:
message = service.users().messages().modify(userId=user_id, id=msg_id,body={ 'removeLabelIds': ['UNREAD']}).execute()
except:
return None
return message
Note: the assistantTask class is part of the Django Openai Assistant package I wrote.
Let’s look at the Email() class that reads a Gmail email, and its attachments to get it ready to be processed by the OpenAI Assistant
class Email:
''' Class for processing an email message from gmail'''
def __init__(self, user_id, msg_id):
self.user_id = user_id
self.id = msg_id
try:
self.credits = _getcredentials().with_subject(user_id)
self.service = build('gmail', 'v1', credentials=self.credits)
self.message = self.service.users().messages().get(userId=user_id, id=msg_id).execute()
except:
return None
self.thread_id = self.message['threadId']
self.payload = self.message['payload']
for one in self.payload['headers']:
if one['name'] == 'Subject':
self.subject = one['value']
elif one['name'] == 'Date':
self.msg_date = one['value']
#date_parse = (parser.parse(msg_date))
#self.date = (date_parse.date())
elif one['name'] == 'From':
self.from_email = one['value']
self.snippet = self.message['snippet']
def process_parts(self,task,meta):
'''Will process the parts of the email and return the body, html and files as a tuple
will call _process_parts recursively to process nested parts'''
self.body = ""
self.html = ""
self.files = []
self._process_parts(self.payload.get('parts', ''),task,meta)
return self.body, self.html, self.files
def _process_parts(self,pl,task,meta):
try:
for part in pl:
if part['filename'] and part["mimeType"] in ["application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet","application/vnd.ms-excel","application/vnd.ms-powerpoint","application/vnd.openxmlformats-officedocument.presentationml.presentation","text/csv"]:
att_id=part['body']['attachmentId']
att=self.service.users().messages().attachments().get(userId=self.user_id, messageId=self.id,id=att_id).execute()
data=att['data']
file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))
file_id = task.uploadFile(fileContent=file_data, addToAssistant=False, filename=part['filename'])
meta["files"].append( {"id" :att_id, "name": part['filename'] ,"file_id":file_id} ) # link the openai file attachment to the google email attachemnt
elif part["mimeType"] in ["text/html"]:
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8")
self.html += body
self.body += htmlToText(body)
elif part["mimeType"] in ["multipart/alternative","multipart/mixed"]:
self._process_parts(part['parts'],task,meta)
except errors.HttpError as error:
print('An error occurred: %s' % error)
From the header section we grab the email basics (subject, Date etc) and with process_parts() we take on the body of the email. It took a bit of Googling here and there — but this recursive iteration seems to be handling all incoming emails pretty well so far, with or without attachments, plain or htm, straight from Gmail web client or send from iPhone email. Because emails can be ‘multipart’ which are essentially nested, we have a _process_part() function that can call itself to handle the subparts.
I have opted to save any email OpenAI supported attachment in the Assistant thread because now that Assistants support vision (v2, April 2024) it got so much more useful for my users. They can now take a picture of business card and email it to Vic and say ‘Add the person on the business card to Salesforce’. Vision is also great at reading lists of attendees from a photo, creating a list and then doing a task like checking if they are already in Salesforce or finding their LinkedIn URL.
The prompt for the Email processing Assistant is fairly lengthy (650 words) because it can handle quite a few tasks and has a 15 function toolbox to work with. People internally have come to love it!
When the task is completed we grab the full response (often nested, and including new attachments generated by the Assistant) and create a reply email.
The Assistant reply can be a very lenghtly, complex set of messages that includes annotations, images and other attachments. Annotations become hyperlinks, images we inline as part of the multipart message and other attachments are added as regular email attachments. It took some trial and error but works pretty well today!
from email.message import EmailMessage
@shared_task(Name="Email reply",max_retries=0 )
def replyToEmail(taskID):
task = assistantTask(run_id = taskID)
subject = task.metadata.get('subject','')
threadId = task.metadata.get('thread_id',None)
to_email = task.metadata.get('from', 'bot@yourdomain.com')
fromName = "Your botname"
fromEmail= "bot@yourdomain.com"
if not "@yourdomain.com" in to_email: # we only answer internal email for now!
return
body = task.getallmessages() # get the full Assistant thread messages
message = EmailMessage() # new empty email
message_body = ''
for m in reversed(body): # loop through the thread messages
if m.role !='assistant': # we only use the Assistant reply
continue
for c in m.content:
if c.type == 'text': # text is received in markdown with annotations
body_text =c.text.value
if len(c.text.annotations)>0:
for a in c.text.annotations:
if a.type == 'file_path':
content = task.retrieveFile(a.file_path.file_id)
maintype, subtype = get_mimetype(a.text)
filename = os.path.basename(a.text)
message.add_attachment(content.content, maintype=maintype, subtype=subtype, filename=filename,headers=['Content-ID: <'+filename+'>'])
if maintype == 'image':
start = a.start_index-1
end = a.end_index+1
url = "<img style='width: 600px;' src='cid:"+filename+"'/>"
else:
start = a.start_index
end = a.end_index
url = "cid:"+filename
body_text = annotate(body_text, start, end, url)
# part = MIMEText("<a href='cid:"+a.file_path.file_id+"'>"+a.text+"</a>", 'html',"utf-8")
message_body += asmarkdown(body_text, replaceThis='vc.vc/', withThis='https://valorventures.lightning.force.com/lightning/r/')
elif c.type == 'image_file': # images we inline in the multipart message
content = task.retrieveFile(c.image_file.file_id)
#maintype, subtype = get_mimetype(c.image_file.file_id)
maintype, subtype = ("image", "png")
filename = c.image_file.file_id+'.png'
message.add_attachment(content.content, maintype=maintype, subtype=subtype, filename=filename,headers=['Content-ID: <'+filename+'>'])
message_body += "<img style='width: 600px;' src='cid:"+filename+"'/>"
message.add_attachment( message_body, 'html' )
message['to'] = to_email
message['subject'] = subject
message['from'] = formataddr((fromName, fromEmail))
create_message = {'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()}
if threadId is not None:
create_message['threadId'] = threadId
try:
message = (service.users().messages().send(userId=user, body=create_message).execute())
print(F'sent message to {message} Message Id: {message["id"]}')
except HTTPError as error:
print(F'An error occurred: {error}')
message = None
For clarity in case you want to do this with pure Assistant — this is what task.getallmessages() takes from the Assistant API
def getallmessages(self):
''' Get all messages from the assistant
'''
messages = self.client.beta.threads.messages.list( thread_id=self.thread_id)
return messages.data
Here are some actual samples of the type of messages that we process: