66 lines
2.2 KiB
Python
66 lines
2.2 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import smtplib
|
||
|
|
import ssl
|
||
|
|
from email.header import Header
|
||
|
|
from email.mime.multipart import MIMEMultipart
|
||
|
|
from email.mime.text import MIMEText
|
||
|
|
from html import escape
|
||
|
|
|
||
|
|
|
||
|
|
class EmailNotificationService:
|
||
|
|
def send(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
smtp_host: str,
|
||
|
|
smtp_port: int,
|
||
|
|
smtp_username: str,
|
||
|
|
smtp_password: str,
|
||
|
|
sender_email: str,
|
||
|
|
recipients: list[str],
|
||
|
|
subject: str,
|
||
|
|
text_body: str,
|
||
|
|
) -> None:
|
||
|
|
if not smtp_host:
|
||
|
|
raise ValueError("smtp_host 未配置")
|
||
|
|
if not smtp_username:
|
||
|
|
raise ValueError("smtp_username 未配置")
|
||
|
|
if not smtp_password:
|
||
|
|
raise ValueError("smtp_password 未配置")
|
||
|
|
if not sender_email:
|
||
|
|
raise ValueError("sender_email 未配置")
|
||
|
|
if not recipients:
|
||
|
|
raise ValueError("recipients 未配置")
|
||
|
|
|
||
|
|
message = MIMEMultipart("alternative")
|
||
|
|
message["From"] = sender_email
|
||
|
|
message["To"] = ", ".join(recipients)
|
||
|
|
message["Subject"] = str(Header(subject, "utf-8"))
|
||
|
|
html_body = (
|
||
|
|
"<html><head><meta charset=\"utf-8\"></head>"
|
||
|
|
"<body style=\"font-family:Microsoft YaHei, PingFang SC, Arial, sans-serif; line-height:1.7; font-size:14px; color:#111;\">"
|
||
|
|
f"{escape(text_body).replace(chr(10), '<br>')}"
|
||
|
|
"</body></html>"
|
||
|
|
)
|
||
|
|
message.attach(MIMEText(text_body, "plain", "utf-8"))
|
||
|
|
message.attach(MIMEText(html_body, "html", "utf-8"))
|
||
|
|
|
||
|
|
context = ssl.create_default_context()
|
||
|
|
port = int(smtp_port)
|
||
|
|
|
||
|
|
if port == 465:
|
||
|
|
with smtplib.SMTP_SSL(smtp_host, port, timeout=20, context=context) as server:
|
||
|
|
server.login(smtp_username, smtp_password)
|
||
|
|
server.sendmail(sender_email, recipients, message.as_bytes())
|
||
|
|
return
|
||
|
|
|
||
|
|
with smtplib.SMTP(smtp_host, port, timeout=20) as server:
|
||
|
|
server.ehlo()
|
||
|
|
server.starttls(context=context)
|
||
|
|
server.ehlo()
|
||
|
|
server.login(smtp_username, smtp_password)
|
||
|
|
server.sendmail(sender_email, recipients, message.as_bytes())
|
||
|
|
|
||
|
|
|
||
|
|
email_notification_service = EmailNotificationService()
|