import email import imaplib import smtplib from email.message import EmailMessage from enum import Enum from pydantic import BaseModel from app.mail import Mail from app.user import User from app.utils import subject_identifier class Encryption(Enum): SSL_TLS = "ssl_tls" STARTTLS = "starttls" class MailProvider(BaseModel): name: str base_url: str imap_url: str imap_port: int = 993 imap_encryption: Encryption = Encryption.STARTTLS smtp_url: str smtp_port: int = 587 smtp_encryption: Encryption = Encryption.STARTTLS def send_mail(self, from_user: User, to_user: User, message: EmailMessage): try: with smtplib.SMTP(self.smtp_url, self.smtp_port) as server: server.starttls() server.ehlo() server.login(from_user.mail, from_user.password) server.sendmail(from_user.mail, to_user.mail, message.as_string()) return True except Exception as e: print(e) return False def get_mailboxes(self, user: User): imap = imaplib.IMAP4_SSL(self.imap_url) imap.login(user.mail, user.password) mailboxes = imap.list() print(mailboxes) @staticmethod def parse_mail(msg): for response in msg: if isinstance(response, tuple): msg = email.message_from_bytes(response[1]) if msg.is_multipart(): return try: return Mail( date=msg.get("date"), subject=msg.get("subject"), mail_from=msg.get("from"), mail_to=msg.get("to"), cc="", body=msg.get_payload(decode=True).decode(), ) except Exception as e: print(e) def imap_login(self, user: User) -> imaplib.IMAP4_SSL: imap = imaplib.IMAP4_SSL(self.imap_url, self.smtp_port) imap.starttls() imap.ehlo() imap.login(user.mail, user.password) return imap @staticmethod def imap_logout(imap: imaplib.IMAP4_SSL): imap.close() imap.logout() def get_all_mails_from_mailbox(self, user: User, mailbox: str): imap = self.imap_login(user) status, byte_message_count = imap.select(mailbox) mail_count = int(byte_message_count[0]) raw_messages = [] # fetch raw mails for i in range(mail_count, 0, -1): res, msg = imap.fetch(str(i), "(RFC822)") raw_messages.append(msg) self.imap_logout(imap) messages = [] for message in raw_messages: m = self.parse_mail(message) messages.append(m) if m else None return messages def search_relevant_mails(self, user: User, mailbox: str): imap = self.imap_login(user) imap.select(mailbox) status, byte_message_position_ids = imap.search(None, f"SUBJECT {subject_identifier}") if byte_message_position_ids[0] == b"": print(f"No messages with the subject {subject_identifier} found!") return [] message_position_ids = byte_message_position_ids[0].decode("utf-8").split(" ") raw_messages = [] # fetch raw mails for i in message_position_ids: res, msg = imap.fetch(str(i), "(RFC822)") raw_messages.append(msg) self.imap_logout(imap) messages = [] for message in raw_messages: m = self.parse_mail(message) messages.append(m) if m else None return messages