Skip to content

Commit af5c7fc

Browse files
committed
Add email search processor
1 parent d5a1ef3 commit af5c7fc

File tree

1 file changed

+228
-0
lines changed

1 file changed

+228
-0
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import base64
2+
import imaplib
3+
import logging
4+
import smtplib
5+
from datetime import date
6+
from email.parser import BytesParser
7+
from email.policy import default
8+
from typing import List, Literal, Optional, Union
9+
10+
from asgiref.sync import async_to_sync
11+
from pydantic import BaseModel, Field
12+
13+
from llmstack.apps.schemas import OutputTemplate
14+
from llmstack.common.blocks.base.processor import Schema
15+
from llmstack.processors.providers.api_processor_interface import ApiProcessorInterface
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class GmailEmailProvider(BaseModel):
21+
type: Literal["gmail"] = "gmail"
22+
23+
24+
class OutlookEmailProvider(BaseModel):
25+
type: Literal["outlook"] = "outlook"
26+
27+
28+
class YahooEmailProvider(BaseModel):
29+
type: Literal["yahoo"] = "yahoo"
30+
31+
32+
EmailProvider = Union[GmailEmailProvider, OutlookEmailProvider, YahooEmailProvider]
33+
34+
35+
class EmailSearchInput(Schema):
36+
sender: Optional[str] = Field(None, description="Filter by email sender")
37+
subject: Optional[str] = Field(None, description="Filter by email subject")
38+
since: Optional[date] = Field(None, description="Filter emails since a specific date")
39+
before: Optional[date] = Field(None, description="Filter emails before a specific date")
40+
unseen: bool = Field(False, description="Filter for unread emails")
41+
seen: bool = Field(False, description="Filter for read emails")
42+
43+
def build_search_query(self) -> str:
44+
"""Build the IMAP search query based on the provided filters."""
45+
query_parts = []
46+
47+
if self.sender:
48+
query_parts.append(f'FROM "{self.sender}"')
49+
50+
if self.subject:
51+
query_parts.append(f'SUBJECT "{self.subject}"')
52+
53+
if self.since:
54+
query_parts.append(f'SINCE "{self.since.strftime("%d-%b-%Y")}"')
55+
56+
if self.before:
57+
query_parts.append(f'BEFORE "{self.before.strftime("%d-%b-%Y")}"')
58+
59+
if self.unseen:
60+
query_parts.append("UNSEEN")
61+
62+
if self.seen:
63+
query_parts.append("SEEN")
64+
65+
# Join all parts with a space to form the final search query
66+
return " ".join(query_parts)
67+
68+
69+
class EmailSearchConfigurations(Schema):
70+
email_provider: EmailProvider = Field(
71+
default=GmailEmailProvider(),
72+
description="Email provider to use",
73+
json_schema_extra={"advanced_parameter": False},
74+
)
75+
connection_id: Optional[str] = Field(
76+
default=None,
77+
json_schema_extra={"widget": "connection"},
78+
description="Use your authenticated connection to make the request",
79+
)
80+
mailbox: str = Field(
81+
default="INBOX",
82+
description="Mailbox to search in",
83+
)
84+
85+
86+
class EmailSearchResult(Schema):
87+
id: str = Field(default="", description="Email ID")
88+
subject: str = Field(description="Subject of the email")
89+
sender: str = Field(description="Sender of the email")
90+
body: Optional[str] = Field(default=None, description="Body of the email")
91+
text_body: Optional[str] = Field(default=None, description="Text body of the email")
92+
html_body: Optional[str] = Field(default=None, description="HTML body of the email")
93+
html_body_text: Optional[str] = Field(default=None, description="HTML body of the email as text")
94+
attachments: Optional[List[str]] = Field(default=[], description="Email Attachments")
95+
cc: Optional[List[str]] = Field(default=[], description="CC of the email")
96+
bcc: Optional[List[str]] = Field(default=[], description="BCC of the email")
97+
98+
99+
class EmailSearchOutput(Schema):
100+
results: List[EmailSearchResult] = Field(
101+
default=[],
102+
description="Email search results",
103+
)
104+
105+
106+
def search_email_via_gmail(recipients, msg, smtp_username, smtp_password):
107+
smtp_server = "smtp.gmail.com"
108+
port = 587 # For TLS
109+
msg["From"] = smtp_username
110+
111+
with smtplib.SMTP(smtp_server, port) as server:
112+
server.starttls() # Upgrade the connection to a secure encrypted SSL/TLS connection
113+
server.login(smtp_username, smtp_password)
114+
server.sendmail(smtp_username, recipients, msg.as_string())
115+
116+
117+
def search_email_via_outlook(recipients, msg, smtp_username, smtp_password):
118+
smtp_server = "smtp.outlook.com"
119+
port = 587 # For TLS
120+
with smtplib.SMTP(smtp_server, port) as server:
121+
server.starttls() # Upgrade the connection to a secure encrypted SSL/TLS connection
122+
server.login(smtp_username, smtp_password)
123+
server.sendmail(smtp_username, recipients, msg.as_string())
124+
125+
126+
def search_email_via_yahoo(recipients, msg, smtp_username, smtp_password):
127+
smtp_server = "smtp.mail.yahoo.com"
128+
port = 587 # For TLS
129+
with smtplib.SMTP(smtp_server, port) as server:
130+
server.starttls()
131+
server.login(smtp_username, smtp_password)
132+
server.sendmail(smtp_username, recipients, msg.as_string())
133+
134+
135+
class EmailSenderProcessor(ApiProcessorInterface[EmailSearchInput, EmailSearchOutput, EmailSearchConfigurations]):
136+
@staticmethod
137+
def name() -> str:
138+
return "Email Search"
139+
140+
@staticmethod
141+
def slug() -> str:
142+
return "email_search"
143+
144+
@staticmethod
145+
def description() -> str:
146+
return "Search for emails"
147+
148+
@staticmethod
149+
def provider_slug() -> str:
150+
return "promptly"
151+
152+
@classmethod
153+
def get_output_template(cls) -> OutputTemplate | None:
154+
return OutputTemplate(
155+
markdown="""{% for result in results %}
156+
{{result.subject}}
157+
{{result.body}}
158+
159+
{% endfor %}"""
160+
)
161+
162+
def process(self) -> dict:
163+
# Connect to the email server
164+
mail = None
165+
connection = self._env["connections"][self._config.connection_id]["configuration"]
166+
if isinstance(self._config.email_provider, GmailEmailProvider):
167+
mail = imaplib.IMAP4_SSL("imap.gmail.com")
168+
mail.login(connection["username"], connection["password"])
169+
elif isinstance(self._config.email_provider, OutlookEmailProvider):
170+
mail = imaplib.IMAP4_SSL("imap-mail.outlook.com")
171+
mail.login(connection["username"], connection["password"])
172+
elif isinstance(self._config.email_provider, YahooEmailProvider):
173+
mail = imaplib.IMAP4_SSL("imap.mail.yahoo.com")
174+
mail.login(connection["username"], connection["password"])
175+
else:
176+
raise ValueError("Invalid email provider")
177+
178+
mail.select(mailbox=self._config.mailbox)
179+
status, messages = mail.search(None, self._input.build_search_query())
180+
ids = messages[0].split()
181+
results = []
182+
for id in ids:
183+
status, msg_data = mail.fetch(id, "(BODY.PEEK[])")
184+
for response_part in msg_data:
185+
if isinstance(response_part, tuple):
186+
msg = BytesParser(policy=default).parsebytes(response_part[1])
187+
email_data = EmailSearchResult(
188+
id=id.decode("utf-8"),
189+
subject=msg["subject"],
190+
sender=msg["from"],
191+
)
192+
email_data.cc = msg["cc"] if "cc" in msg else []
193+
email_data.bcc = msg["bcc"] if "bcc" in msg else []
194+
if msg.is_multipart():
195+
for part in msg.iter_parts():
196+
if part.get_content_type() == "text/plain":
197+
email_data.text_body = part.get_payload(decode=True).decode(part.get_content_charset())
198+
elif part.get_content_type() == "text/html":
199+
email_data.html_body = part.get_payload(decode=True).decode(part.get_content_charset())
200+
elif part.get_content_disposition() == "attachment":
201+
attachment_data = part.get_payload(decode=True)
202+
attachment_name = part.get_filename()
203+
attachment_base64 = base64.b64encode(attachment_data).decode("utf-8")
204+
attachment_data_uri = (
205+
f"data:{part.get_content_type()};name={attachment_name};base64,{attachment_base64}"
206+
)
207+
objref = self._upload_asset_from_url(
208+
attachment_data_uri, attachment_name, part.get_content_type()
209+
)
210+
email_data.attachments.append(objref)
211+
else:
212+
content_type = msg.get_content_type()
213+
if content_type == "text/plain":
214+
email_data.text_body = msg.get_payload(decode=True).decode(msg.get_content_charset())
215+
elif content_type == "text/html":
216+
email_data.html_body = msg.get_payload(decode=True).decode(msg.get_content_charset())
217+
218+
if email_data.html_body:
219+
from bs4 import BeautifulSoup
220+
221+
soup = BeautifulSoup(email_data.html_body, "html.parser")
222+
email_data.html_body_text = soup.get_text()
223+
email_data.body = email_data.text_body or email_data.html_body_text
224+
results.append(email_data)
225+
226+
async_to_sync(self._output_stream.write)(EmailSearchOutput(results=results))
227+
output = self._output_stream.finalize()
228+
return output

0 commit comments

Comments
 (0)