Commit c37b9383 authored by Benjamin Krause's avatar Benjamin Krause
Browse files

Modify and cleanup mail proxy

parent aa4ff70c
from email.message import EmailMessage
from pydantic import BaseModel
......@@ -8,3 +10,13 @@ class Mail(BaseModel):
mail_to: str
cc: str
body: str
def as_email_message(self) -> EmailMessage:
msg = EmailMessage()
msg.set_content(self.body)
msg["Date"] = self.date
msg["Subject"] = self.subject
msg["From"] = self.mail_from
msg["To"] = self.mail_to
msg["CC"] = self.cc
return msg
import email
import imaplib
import smtplib
import ssl
from email.message import EmailMessage
from enum import Enum
......@@ -8,42 +9,36 @@ from pydantic import BaseModel
from app.mail import Mail
from app.user import User
from app.utils import subject_identifier
class Encryption(Enum):
SSL_TLS = "ssl_tls"
STARTTLS = "starttls"
SSL_TLS = "SSL_TLS"
STARTTLS = "STARTTLS"
class MailProvider(BaseModel):
name: str
id: str
base_url: str
imap_url: str
imap_port: int = 993
imap_encryption: Encryption = Encryption.STARTTLS
imap_encryption: str = Encryption.STARTTLS
smtp_url: str
smtp_port: int = 587
smtp_encryption: Encryption = Encryption.STARTTLS
smtp_encryption: str = Encryption.STARTTLS
def send_mail(self, from_user: User, to_user: User, message: EmailMessage):
try:
with smtplib.SMTP(self.smtp_url, self.smtp_port) as server:
server.starttls()
server.ehlo()
server.login(from_user.mail, from_user.password)
server.login(from_user.login_name, from_user.password)
server.sendmail(from_user.mail, to_user.mail, message.as_string())
server.close()
return True
except Exception as e:
print(e)
return False
def get_mailboxes(self, user: User):
imap = imaplib.IMAP4_SSL(self.imap_url)
imap.login(user.mail, user.password)
mailboxes = imap.list()
print(mailboxes)
@staticmethod
def parse_mail(msg):
for response in msg:
......@@ -64,15 +59,19 @@ class MailProvider(BaseModel):
print(e)
def imap_login(self, user: User) -> imaplib.IMAP4_SSL:
imap = imaplib.IMAP4_SSL(self.imap_url, self.smtp_port)
imap.starttls()
imap.ehlo()
imap.login(user.mail, user.password)
if self.imap_encryption == Encryption.SSL_TLS.value:
context = ssl.create_default_context()
imap = imaplib.IMAP4_SSL(
host=self.imap_url, port=self.imap_port, ssl_context=context
)
else:
imap = imaplib.IMAP4_SSL(host=self.imap_url, port=self.imap_port)
imap.starttls()
imap.login(user.login_name, user.password)
return imap
@staticmethod
def imap_logout(imap: imaplib.IMAP4_SSL):
imap.close()
imap.logout()
def get_all_mails_from_mailbox(self, user: User, mailbox: str):
......@@ -84,6 +83,7 @@ class MailProvider(BaseModel):
for i in range(mail_count, 0, -1):
res, msg = imap.fetch(str(i), "(RFC822)")
raw_messages.append(msg)
imap.close()
self.imap_logout(imap)
messages = []
for message in raw_messages:
......@@ -91,12 +91,12 @@ class MailProvider(BaseModel):
messages.append(m) if m else None
return messages
def search_relevant_mails(self, user: User, mailbox: str):
def search_relevant_mails(self, user: User, mailbox: str, search_subject: str):
imap = self.imap_login(user)
imap.select(mailbox)
status, byte_message_position_ids = imap.search(None, f"SUBJECT {subject_identifier}")
status, byte_message_position_ids = imap.search(None, search_subject)
if byte_message_position_ids[0] == b"":
print(f"No messages with the subject {subject_identifier} found!")
print(f"No messages with the subject {search_subject} found!")
return []
message_position_ids = byte_message_position_ids[0].decode("utf-8").split(" ")
raw_messages = []
......@@ -104,9 +104,16 @@ class MailProvider(BaseModel):
for i in message_position_ids:
res, msg = imap.fetch(str(i), "(RFC822)")
raw_messages.append(msg)
imap.close()
self.imap_logout(imap)
messages = []
for message in raw_messages:
m = self.parse_mail(message)
messages.append(m) if m else None
return messages
def get_mailboxes(self, user: User):
imap = self.imap_login(user)
mailboxes = imap.list()
self.imap_logout(imap)
return mailboxes
......@@ -2,5 +2,6 @@ from pydantic import BaseModel
class User(BaseModel):
login_name: str
mail: str
password: str
from datetime import datetime
subject_identifier = "#ba_subject"
def get_mail_formatted_date():
x = datetime.now()
......
......@@ -7,17 +7,28 @@ from starlette.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from app.mail import Mail
from app.mail_provider import MailProvider
from app.user import User
from app.utils import get_mail_formatted_date, subject_identifier
from main_programm import mail_providers, create_mail, create_mail_message
from app.utils import get_mail_formatted_date
class GetMail(BaseModel):
provider: MailProvider
user: User
mailbox: str
class SearchMails(GetMail):
search_subject: str
class GetMailboxes(BaseModel):
provider: MailProvider
user: User
class SendMail(BaseModel):
provider: MailProvider
from_user: User
to_user: User
subject: str
......@@ -44,46 +55,56 @@ def init() -> FastAPI:
def read_root():
return RedirectResponse("/web/dist/index.html")
@app.post("/mails/get")
def get_mails(data: GetMail):
mails = mail_providers.get("webde").search_relevant_mails(data.user, data.mailbox)
pprint(mails)
return mails
@app.post("/mails/get_all")
@app.get("/dong_long")
def ping():
return "Dongelong!"
@app.post("/provider/user/mailbox/search_mails")
def search_mails_with_subject(data: SearchMails):
try:
mails = data.provider.search_relevant_mails(
data.user, data.mailbox, data.search_subject
)
return mails
except Exception as e:
return e
@app.post("/provider/user/mailbox/get_all_mails")
def get_all_mails(data: GetMail):
"""
Returns all non multipart emails
:param data:
:return:
Returns all non multipart emails with '#ba_subject' in the subject
"""
mails = mail_providers.get("webde").get_all_mails_from_mailbox(data.user, data.mailbox)
pprint(mails)
return mails
try:
mails = data.provider.get_all_mails_from_mailbox(data.user, data.mailbox)
return mails
except Exception as e:
return e
@app.post("/mails/send")
@app.post("/provider/user/send_mail")
def send_mail(data: SendMail):
print("Send a mail.")
from_user = data.from_user
to = data.to_user
message = Mail(
date=get_mail_formatted_date(),
subject=f"{data.subject} - {subject_identifier}",
subject=f"{data.subject}",
mail_from=from_user.mail,
mail_to=to.mail,
cc="",
body=data.message,
)
if mail_providers.get("webde").send_mail(from_user, to, create_mail_message(message)):
print("The email was send successfully!")
if data.provider.send_mail(from_user, to, message.as_email_message()):
return "The email was send successfully!"
else:
print("Failed to send the email!")
return "Failed to send the email!"
@app.post("/provider/user/mailboxes")
def get_mailboxes(data: GetMailboxes):
return data.provider.get_mailboxes(data.user)
return app
fast_api = init()
if __name__ == "__main__":
uvicorn.run("main:fast_api", host="0.0.0.0", port=8000, reload=True)
from email.message import EmailMessage
from pprint import pprint
from app.mail import Mail
from app.mail_provider import MailProvider
from app.user import User
from app.utils import get_mail_formatted_date, subject_identifier
mail_providers = dict(
webde=MailProvider(
base_url="web.de", imap_url="imap.web.de", smtp_url="smtp.web.de", smtp_port=587
)
)
def create_user() -> User:
print("Create a login")
mail = input("Type here the mail address: ")
password = input("Type here the password to the mail address: ")
return User(mail=mail, password=password)
def create_receiver() -> User:
print("Create a user to receive the email")
mail = input("Type here the receiver mail address: ")
return User(mail=mail, password="")
def create_mail(from_user: User, to: User) -> Mail:
print("Create the a mail.")
date = get_mail_formatted_date()
subject = input("Type here your subject: ")
body = input("Type here your message: ")
return Mail(
date=date,
subject=f"{subject} - {subject_identifier}",
mail_from=from_user.mail,
mail_to=to.mail,
cc="",
body=body,
)
def create_mail_message(mail: Mail) -> EmailMessage:
msg = EmailMessage()
msg.set_content(mail.body)
msg["Date"] = mail.date
msg["Subject"] = mail.subject
msg["From"] = mail.mail_from
msg["To"] = mail.mail_to
msg["CC"] = mail.cc
return msg
def send_mail():
print("Send a mail.")
from_user = create_user()
to = create_receiver()
message = create_mail(from_user, to)
if mail_providers.get("webde").send_mail(from_user, to, create_mail_message(message)):
print("The email was send successfully!")
else:
print("Failed to send the email!")
def get_relevant_mails():
user = create_user()
mailbox = input("Type here the mailbox, in which you want to search: ")
mails = mail_providers.get("webde").search_relevant_mails(user, mailbox)
pprint(mails)
if __name__ == "__main__":
select = input("Type send to 'send' a mail or 'get' to show relevant mails: ")
if select == "send":
send_mail()
elif select == "get":
get_relevant_mails()
else:
print("You should type 'send' or 'get'! Try it again.")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment