mail_provider.py 3.62 KB
Newer Older
1
2
3
4
import email
import imaplib
import smtplib
from email.message import EmailMessage
Benjamin Krause's avatar
Benjamin Krause committed
5
from enum import Enum
6
7
8
9
10
11
12
13

from pydantic import BaseModel

from app.mail import Mail
from app.user import User
from app.utils import subject_identifier


Benjamin Krause's avatar
Benjamin Krause committed
14
15
16
17
18
class Encryption(Enum):
    SSL_TLS = "ssl_tls"
    STARTTLS = "starttls"


19
class MailProvider(BaseModel):
Benjamin Krause's avatar
Benjamin Krause committed
20
    name: str
21
22
    base_url: str
    imap_url: str
Benjamin Krause's avatar
Benjamin Krause committed
23
24
    imap_port: int = 993
    imap_encryption: Encryption = Encryption.STARTTLS
25
    smtp_url: str
Benjamin Krause's avatar
Benjamin Krause committed
26
27
    smtp_port: int = 587
    smtp_encryption: Encryption = Encryption.STARTTLS
28
29
30
31
32

    def send_mail(self, from_user: User, to_user: User, message: EmailMessage):
        try:
            with smtplib.SMTP(self.smtp_url, self.smtp_port) as server:
                server.starttls()
Benjamin Krause's avatar
Benjamin Krause committed
33
                server.ehlo()
34
35
36
37
38
39
40
41
42
43
44
45
46
                server.login(from_user.mail, from_user.password)
                server.sendmail(from_user.mail, to_user.mail, message.as_string())
                return True
        except Exception as e:
            print(e)
            return False

    def get_mailboxes(self, user: User):
        imap = imaplib.IMAP4_SSL(self.imap_url)
        imap.login(user.mail, user.password)
        mailboxes = imap.list()
        print(mailboxes)

Benjamin Krause's avatar
Benjamin Krause committed
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    @staticmethod
    def parse_mail(msg):
        for response in msg:
            if isinstance(response, tuple):
                msg = email.message_from_bytes(response[1])
                if msg.is_multipart():
                    return
                try:
                    return Mail(
                        date=msg.get("date"),
                        subject=msg.get("subject"),
                        mail_from=msg.get("from"),
                        mail_to=msg.get("to"),
                        cc="",
                        body=msg.get_payload(decode=True).decode(),
                    )
                except Exception as e:
                    print(e)

    def imap_login(self, user: User) -> imaplib.IMAP4_SSL:
Benjamin Krause's avatar
Benjamin Krause committed
67
68
69
        imap = imaplib.IMAP4_SSL(self.imap_url, self.smtp_port)
        imap.starttls()
        imap.ehlo()
70
        imap.login(user.mail, user.password)
Benjamin Krause's avatar
Benjamin Krause committed
71
        return imap
72

Benjamin Krause's avatar
Benjamin Krause committed
73
74
75
76
    @staticmethod
    def imap_logout(imap: imaplib.IMAP4_SSL):
        imap.close()
        imap.logout()
77

Benjamin Krause's avatar
Benjamin Krause committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
    def get_all_mails_from_mailbox(self, user: User, mailbox: str):
        imap = self.imap_login(user)
        status, byte_message_count = imap.select(mailbox)
        mail_count = int(byte_message_count[0])
        raw_messages = []
        # fetch raw mails
        for i in range(mail_count, 0, -1):
            res, msg = imap.fetch(str(i), "(RFC822)")
            raw_messages.append(msg)
        self.imap_logout(imap)
        messages = []
        for message in raw_messages:
            m = self.parse_mail(message)
            messages.append(m) if m else None
        return messages
93

Benjamin Krause's avatar
Benjamin Krause committed
94
95
96
97
98
99
100
101
102
103
104
    def search_relevant_mails(self, user: User, mailbox: str):
        imap = self.imap_login(user)
        imap.select(mailbox)
        status, byte_message_position_ids = imap.search(None, f"SUBJECT {subject_identifier}")
        if byte_message_position_ids[0] == b"":
            print(f"No messages with the subject {subject_identifier} found!")
            return []
        message_position_ids = byte_message_position_ids[0].decode("utf-8").split(" ")
        raw_messages = []
        # fetch raw mails
        for i in message_position_ids:
105
            res, msg = imap.fetch(str(i), "(RFC822)")
Benjamin Krause's avatar
Benjamin Krause committed
106
107
108
109
110
111
112
            raw_messages.append(msg)
        self.imap_logout(imap)
        messages = []
        for message in raw_messages:
            m = self.parse_mail(message)
            messages.append(m) if m else None
        return messages