Commit ef03be82 authored by Benjamin Krause's avatar Benjamin Krause
Browse files

Set up a simple api

parent 9058e27b
......@@ -3,3 +3,9 @@ install:
black:
black ./
start:
uvicorn main:fast_api --reload
increase_file_watches:
sudo sysctl fs.inotify.max_user_watches=500000
......@@ -45,7 +45,7 @@ class MailProvider(BaseModel):
imap.login(user.mail, user.password)
imap.select(mailbox)
status, messages = imap.search(None, f"Subject {subject_identifier}")
status, messages = imap.search(None, f"SUBJECT {subject_identifier}")
if messages[0] == b"":
print(f"No messages with the subject {subject_identifier} found!")
return
......@@ -58,16 +58,19 @@ class MailProvider(BaseModel):
for response in msg:
if isinstance(response, tuple):
msg = email.message_from_bytes(response[1])
mails.append(
Mail(
date=msg.get("date"),
subject=msg.get("subject"),
mail_from=msg.get("from"),
mail_to=msg.get("to"),
cc=msg.get("cc"),
body=msg.get_payload(decode=True).decode(),
try:
mails.append(
Mail(
date=msg.get("date"),
subject=msg.get("subject"),
mail_from=msg.get("from"),
mail_to=msg.get("to"),
cc=msg.get("cc"),
body=msg.get_payload(decode=True).decode(),
)
)
)
except Exception as e:
print(e)
imap.close()
imap.logout()
return mails
from email.message import EmailMessage
from pprint import pprint
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from app.mail import Mail
from app.mail_provider import MailProvider
from app.user import User
from app.utils import get_mail_formatted_date, subject_identifier
from main_programm import mail_providers, create_mail, create_mail_message
mail_providers = dict(
webde=MailProvider(
base_url="web.de", imap_url="imap.web.de", smtp_url="smtp.web.de", smtp_port=587
)
)
def create_user() -> User:
print("Create a login")
mail = input("Type here the mail address: ")
password = input("Type here the password to the mail address: ")
return User(mail=mail, password=password)
def create_receiver() -> User:
print("Create a user to receive the email")
mail = input("Type here the receiver mail address: ")
return User(mail=mail, password="")
def create_mail(from_user: User, to: User) -> Mail:
print("Create the a mail.")
date = get_mail_formatted_date()
subject = input("Type here your subject: ")
body = input("Type here your message: ")
return Mail(
date=date,
subject=f"{subject} - {subject_identifier}",
mail_from=from_user.mail,
mail_to=to.mail,
cc="",
body=body,
class GetMail(BaseModel):
user: User
mailbox: str
class SendMail(BaseModel):
from_user: User
to_user: User
subject: str
message: str
def init() -> FastAPI:
app = FastAPI()
origins = [
"http://localhost",
"http://localhost:8080",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def read_root():
return RedirectResponse("/web/dist/index.html")
def create_mail_message(mail: Mail) -> EmailMessage:
msg = EmailMessage()
msg.set_content(mail.body)
msg["Date"] = mail.date
msg["Subject"] = mail.subject
msg["From"] = mail.mail_from
msg["To"] = mail.mail_to
msg["CC"] = mail.cc
return msg
@app.post("/mails/get")
def get_mails(data: GetMail):
mails = mail_providers.get("webde").search_relevant_mails(data.user, data.mailbox)
pprint(mails)
return mails
@app.post("/mails/send")
def send_mail(data: SendMail):
print("Send a mail.")
from_user = data.from_user
to = data.to_user
message = Mail(
date=get_mail_formatted_date(),
subject=f"{data.subject} - {subject_identifier}",
mail_from=from_user.mail,
mail_to=to.mail,
cc="",
body=data.message,
)
if mail_providers.get("webde").send_mail(from_user, to, create_mail_message(message)):
print("The email was send successfully!")
else:
print("Failed to send the email!")
def send_mail():
print("Send a mail.")
from_user = create_user()
to = create_receiver()
message = create_mail(from_user, to)
if mail_providers.get("webde").send_mail(from_user, to, create_mail_message(message)):
print("The email was send successfully!")
else:
print("Failed to send the email!")
return app
def get_relevant_mails():
user = create_user()
mailbox = input("Type here the mailbox, in which you want to search: ")
mails = mail_providers.get("webde").search_relevant_mails(user, mailbox)
pprint(mails)
fast_api = init()
if __name__ == "__main__":
select = input("Type send to 'send' a mail or 'get' to show relevant mails: ")
if select == "send":
send_mail()
elif select == "get":
get_relevant_mails()
else:
print("You should type 'send' or 'get'! Try it again.")
uvicorn.run("main:fast_api", host="0.0.0.0", port=8000, reload=True)
from email.message import EmailMessage
from pprint import pprint
from app.mail import Mail
from app.mail_provider import MailProvider
from app.user import User
from app.utils import get_mail_formatted_date, subject_identifier
mail_providers = dict(
webde=MailProvider(
base_url="web.de", imap_url="imap.web.de", smtp_url="smtp.web.de", smtp_port=587
)
)
def create_user() -> User:
print("Create a login")
mail = input("Type here the mail address: ")
password = input("Type here the password to the mail address: ")
return User(mail=mail, password=password)
def create_receiver() -> User:
print("Create a user to receive the email")
mail = input("Type here the receiver mail address: ")
return User(mail=mail, password="")
def create_mail(from_user: User, to: User) -> Mail:
print("Create the a mail.")
date = get_mail_formatted_date()
subject = input("Type here your subject: ")
body = input("Type here your message: ")
return Mail(
date=date,
subject=f"{subject} - {subject_identifier}",
mail_from=from_user.mail,
mail_to=to.mail,
cc="",
body=body,
)
def create_mail_message(mail: Mail) -> EmailMessage:
msg = EmailMessage()
msg.set_content(mail.body)
msg["Date"] = mail.date
msg["Subject"] = mail.subject
msg["From"] = mail.mail_from
msg["To"] = mail.mail_to
msg["CC"] = mail.cc
return msg
def send_mail():
print("Send a mail.")
from_user = create_user()
to = create_receiver()
message = create_mail(from_user, to)
if mail_providers.get("webde").send_mail(from_user, to, create_mail_message(message)):
print("The email was send successfully!")
else:
print("Failed to send the email!")
def get_relevant_mails():
user = create_user()
mailbox = input("Type here the mailbox, in which you want to search: ")
mails = mail_providers.get("webde").search_relevant_mails(user, mailbox)
pprint(mails)
if __name__ == "__main__":
select = input("Type send to 'send' a mail or 'get' to show relevant mails: ")
if select == "send":
send_mail()
elif select == "get":
get_relevant_mails()
else:
print("You should type 'send' or 'get'! Try it again.")
pydantic
black
\ No newline at end of file
black
fastapi
starlette
uvicorn
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment