Files
zjjk/backend/app/repositories/mysql_repository.py

96 lines
3.5 KiB
Python
Raw Permalink Normal View History

2026-03-20 21:47:30 +08:00
import json
from datetime import datetime
from typing import Any
import pymysql
class MySQLRepository:
def __init__(self, config: dict[str, Any]) -> None:
self.config = config
self._ensure_schema()
def _connect(self):
return pymysql.connect(
host=self.config["mysql_host"],
port=int(self.config.get("mysql_port", 3306)),
user=self.config["mysql_username"],
password=self.config["mysql_password"],
database=self.config["mysql_database"],
charset=self.config.get("mysql_charset", "utf8mb4"),
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
)
def _ensure_schema(self) -> None:
statements = [
"""
CREATE TABLE IF NOT EXISTS app_documents (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
category VARCHAR(64) NOT NULL,
doc_key VARCHAR(128) NOT NULL,
sort_value VARCHAR(64) DEFAULT NULL,
payload LONGTEXT NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
UNIQUE KEY uniq_category_key (category, doc_key),
KEY idx_category_sort (category, sort_value)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
]
with self._connect() as connection:
with connection.cursor() as cursor:
for statement in statements:
cursor.execute(statement)
def read_document(self, category: str, doc_key: str, default: dict[str, Any] | None = None) -> dict[str, Any]:
sql = "SELECT payload FROM app_documents WHERE category=%s AND doc_key=%s LIMIT 1"
with self._connect() as connection:
with connection.cursor() as cursor:
cursor.execute(sql, (category, doc_key))
row = cursor.fetchone()
if not row:
return default or {}
return json.loads(row["payload"])
def write_document(
self,
category: str,
doc_key: str,
payload: dict[str, Any],
*,
sort_value: str | None = None,
) -> None:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sql = """
INSERT INTO app_documents (category, doc_key, sort_value, payload, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
sort_value=VALUES(sort_value),
payload=VALUES(payload),
updated_at=VALUES(updated_at)
"""
serialized = json.dumps(payload, ensure_ascii=False)
with self._connect() as connection:
with connection.cursor() as cursor:
cursor.execute(sql, (category, doc_key, sort_value, serialized, now, now))
def list_documents(
self,
category: str,
*,
limit: int | None = None,
descending: bool = True,
) -> list[dict[str, Any]]:
direction = "DESC" if descending else "ASC"
sql = f"SELECT payload FROM app_documents WHERE category=%s ORDER BY sort_value {direction}, updated_at {direction}"
params: list[Any] = [category]
if limit is not None:
sql += " LIMIT %s"
params.append(limit)
with self._connect() as connection:
with connection.cursor() as cursor:
cursor.execute(sql, params)
rows = cursor.fetchall()
return [json.loads(row["payload"]) for row in rows]