zhoujiaangyao commited on
Commit
aa08cd6
·
1 Parent(s): b98be44

feat(db): 配置与笔记迁入 Postgres,重启不丢

Browse files

同步后端迁移:app_config / notes 两张表 + DAO,配置与笔记从本地文件改为入库。
启动 init_db 自动建表,HF 重启不再丢飞书凭证/cookie/笔记。

.gitignore CHANGED
@@ -10,3 +10,4 @@ backend/app/db/*.db
10
  __pycache__/
11
  *.pyc
12
  .env
 
 
10
  __pycache__/
11
  *.pyc
12
  .env
13
+ backend/fonts/
backend/app/db/app_config_dao.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """app_config 键值配置表的读写。
2
+
3
+ 取代原先各 *_config_manager 的 config/*.json 文件存储。各 manager 的公开方法
4
+ (get_config/update_config/get/set/list_all 等)签名与行为不变,只是底层 _read/_write
5
+ 改调这里,从而让配置随 Postgres 持久化、HF 容器重启不丢。
6
+
7
+ 兼容旧库:首次读某个 key 时若数据库里还没有、但旧的 config/<x>.json 文件存在,
8
+ 就把文件内容一次性导入数据库后返回(幂等)。这样桌面端 / 本地 SQLite 的既有配置不丢。
9
+ """
10
+ import json
11
+ from pathlib import Path
12
+ from typing import Any, Optional
13
+
14
+ from app.db.engine import SessionLocal
15
+ from app.db.models.app_config import AppConfig
16
+
17
+
18
+ def get_value(key: str) -> Optional[Any]:
19
+ """读某个配置域的整份 value(dict 或 list);不存在返回 None。
20
+
21
+ 容错:启动早期(路由在 init_db 建表前就被 import,某些下载器会在导入期读 cookie 配置)
22
+ 或数据库暂不可用时,按「无配置」返回 None,而不是抛错——与旧的「文件不存在/读失败返回 {}」
23
+ 语义一致,避免把启动 import 链炸掉。
24
+ """
25
+ try:
26
+ with SessionLocal() as db:
27
+ row = db.get(AppConfig, key)
28
+ return row.value if row is not None else None
29
+ except Exception:
30
+ return None
31
+
32
+
33
+ def set_value(key: str, value: Any) -> None:
34
+ """整份覆盖某个配置域的 value(upsert)。"""
35
+ with SessionLocal() as db:
36
+ row = db.get(AppConfig, key)
37
+ if row is None:
38
+ db.add(AppConfig(key=key, value=value))
39
+ else:
40
+ row.value = value
41
+ db.commit()
42
+
43
+
44
+ def _read_legacy_file(legacy_path: str) -> Optional[Any]:
45
+ p = Path(legacy_path)
46
+ if not p.exists():
47
+ return None
48
+ try:
49
+ return json.loads(p.read_text(encoding="utf-8"))
50
+ except Exception:
51
+ return None
52
+
53
+
54
+ def load_value(key: str, legacy_path: Optional[str] = None, default: Any = None) -> Any:
55
+ """读配置;数据库没有但旧 JSON 文件存在则一次性导入后返回;都没有返回 default。"""
56
+ value = get_value(key)
57
+ if value is not None:
58
+ return value
59
+ if legacy_path:
60
+ legacy = _read_legacy_file(legacy_path)
61
+ if legacy is not None:
62
+ try:
63
+ set_value(key, legacy)
64
+ except Exception:
65
+ # 表尚未就绪(早期 import)时先返回文件内容,待 init_db 后下次读再持久化
66
+ pass
67
+ return legacy
68
+ return default
backend/app/db/init_db.py CHANGED
@@ -1,5 +1,11 @@
 
 
 
 
 
1
  from app.db.models.articles import ArticleItem, ArticleSubscription, ArticleSubscriptionItem
2
  from app.db.models.models import Model
 
3
  from app.db.models.providers import Provider
4
  from app.db.models.trend_subscription import (
5
  NotificationChannel,
@@ -16,6 +22,7 @@ def init_db():
16
  Base.metadata.create_all(bind=engine)
17
  _ensure_article_content_text(engine)
18
  _ensure_model_provider_id_text(engine)
 
19
 
20
 
21
  # 注:原 _ensure_model_columns 为 models.supports_multimodal 做的迁移已删除——
@@ -46,6 +53,46 @@ def _ensure_model_provider_id_text(engine):
46
  break
47
 
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  def _ensure_article_content_text(engine):
50
  inspector = inspect(engine)
51
  if "article_items" not in inspector.get_table_names():
 
1
+ import glob
2
+ import json
3
+ import os
4
+
5
+ from app.db.models.app_config import AppConfig
6
  from app.db.models.articles import ArticleItem, ArticleSubscription, ArticleSubscriptionItem
7
  from app.db.models.models import Model
8
+ from app.db.models.note import Note
9
  from app.db.models.providers import Provider
10
  from app.db.models.trend_subscription import (
11
  NotificationChannel,
 
22
  Base.metadata.create_all(bind=engine)
23
  _ensure_article_content_text(engine)
24
  _ensure_model_provider_id_text(engine)
25
+ _import_legacy_notes(engine)
26
 
27
 
28
  # 注:原 _ensure_model_columns 为 models.supports_multimodal 做的迁移已删除——
 
53
  break
54
 
55
 
56
+ def _import_legacy_notes(engine):
57
+ # 一次性导入:把旧的 note_results/{task_id}.json(+ {task_id}.status.json)迁进 notes 表。
58
+ # 仅当 notes 表为空时执行,幂等;HF 临时盘上目录为空时直接跳过。失败不影响启动。
59
+ from app.db.note_dao import save_note, set_status
60
+
61
+ note_dir = os.getenv("NOTE_OUTPUT_DIR", "note_results")
62
+ if not os.path.isdir(note_dir):
63
+ return
64
+ try:
65
+ with engine.connect() as conn:
66
+ count = conn.execute(text("SELECT COUNT(*) FROM notes")).scalar()
67
+ if count and int(count) > 0:
68
+ return
69
+ except Exception:
70
+ return
71
+
72
+ imported = 0
73
+ for path in glob.glob(os.path.join(note_dir, "*.json")):
74
+ name = os.path.basename(path)
75
+ stem = name[:-5] # 去掉 .json
76
+ # 跳过缓存/状态/检查点:转写、音频缓存、{id}.status、{key}.gpt.checkpoint 等。
77
+ # 正经笔记的 task_id 是 UUID(不含点),据此排除带点的派生文件。
78
+ if stem.endswith("_transcript") or stem.endswith("_audio") or "." in stem:
79
+ continue
80
+ try:
81
+ content = json.loads(open(path, "r", encoding="utf-8").read())
82
+ except Exception:
83
+ continue
84
+ save_note(stem, content)
85
+ status_path = os.path.join(note_dir, f"{stem}.status.json")
86
+ if os.path.exists(status_path):
87
+ try:
88
+ set_status(stem, json.loads(open(status_path, "r", encoding="utf-8").read()))
89
+ except Exception:
90
+ pass
91
+ imported += 1
92
+ if imported:
93
+ print(f"[init_db] 已从本地文件导入 {imported} 篇历史笔记到数据库")
94
+
95
+
96
  def _ensure_article_content_text(engine):
97
  inspector = inspect(engine)
98
  if "article_items" not in inspector.get_table_names():
backend/app/db/models/app_config.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, String, JSON, DateTime, func
2
+
3
+ from app.db.engine import Base
4
+
5
+
6
+ class AppConfig(Base):
7
+ """通用键值配置表:把原先散落在 config/*.json 的配置迁进数据库,
8
+ 让飞书凭证 / 平台 cookie / 代理 / 转写设置 / 自定义平台等重启不丢。
9
+
10
+ - key: 配置域名(feishu / proxy / downloader / transcriber / custom_platforms)
11
+ - value: 该域的整份配置,任意 JSON(dict 或 list)。Postgres 上是 JSONB,
12
+ SQLite 上是 JSON 文本,由 SQLAlchemy 的 JSON 类型按方言落地。
13
+ """
14
+
15
+ __tablename__ = "app_config"
16
+
17
+ key = Column(String, primary_key=True)
18
+ value = Column(JSON, nullable=True)
19
+ updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
backend/app/db/models/note.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, String, JSON, DateTime, func
2
+
3
+ from app.db.engine import Base
4
+
5
+
6
+ class Note(Base):
7
+ """笔记正文与任务状态:原先是 note_results/{task_id}.json 和
8
+ {task_id}.status.json 两个本地文件,迁进数据库后重启不丢。
9
+
10
+ - content: 笔记结果整体(markdown 版本数组 / transcript / audio_meta 等),
11
+ 对应旧的 {task_id}.json。
12
+ - status: 任务状态字典 {status, paused, message?, cache?},
13
+ 对应旧的 {task_id}.status.json。
14
+ 两者各自独立 upsert,互不覆盖(状态机先写 status,生成完才写 content)。
15
+ 转写/音频缓存、_markdown.md、截图等可重建数据仍留在磁盘,不入库。
16
+ """
17
+
18
+ __tablename__ = "notes"
19
+
20
+ task_id = Column(String, primary_key=True)
21
+ content = Column(JSON, nullable=True)
22
+ status = Column(JSON, nullable=True)
23
+ created_at = Column(DateTime, server_default=func.now())
24
+ updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
backend/app/db/note_dao.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """notes 表的读写:笔记正文(content)与任务状态(status)。
2
+
3
+ 取代原先 note_results/{task_id}.json 与 {task_id}.status.json 两个本地文件。
4
+ content 与 status 各自独立 upsert,互不覆盖。
5
+ """
6
+ from typing import Any, Dict, Optional
7
+
8
+ from app.db.engine import SessionLocal
9
+ from app.db.models.note import Note
10
+
11
+
12
+ def _upsert(task_id: str, **fields) -> None:
13
+ with SessionLocal() as db:
14
+ row = db.get(Note, task_id)
15
+ if row is None:
16
+ row = Note(task_id=task_id)
17
+ db.add(row)
18
+ for key, val in fields.items():
19
+ setattr(row, key, val)
20
+ db.commit()
21
+
22
+
23
+ def save_note(task_id: str, content: Dict[str, Any]) -> None:
24
+ """写笔记正文(不动 status)。"""
25
+ _upsert(task_id, content=content)
26
+
27
+
28
+ def load_note(task_id: str) -> Optional[Dict[str, Any]]:
29
+ """读笔记正文;不存在返回 None。"""
30
+ with SessionLocal() as db:
31
+ row = db.get(Note, task_id)
32
+ return row.content if row is not None and row.content is not None else None
33
+
34
+
35
+ def set_status(task_id: str, status_data: Dict[str, Any]) -> None:
36
+ """写任务状态字典 {status, paused, message?, cache?}(不动 content)。"""
37
+ _upsert(task_id, status=status_data)
38
+
39
+
40
+ def get_status(task_id: str) -> Optional[Dict[str, Any]]:
41
+ """读任务状态;不存在返回 None。"""
42
+ with SessionLocal() as db:
43
+ row = db.get(Note, task_id)
44
+ return row.status if row is not None and row.status is not None else None
45
+
46
+
47
+ def delete_note(task_id: str) -> None:
48
+ """删除笔记行(正文 + 状态)。"""
49
+ with SessionLocal() as db:
50
+ row = db.get(Note, task_id)
51
+ if row is not None:
52
+ db.delete(row)
53
+ db.commit()
backend/app/routers/feishu.py CHANGED
@@ -46,19 +46,16 @@ def push_task_to_feishu(task_id: str, version_id: Optional[str] = None) -> dict:
46
  被「手动推送」接口与「生成后自动推送」共用。任何失败都抛 FeishuError,
47
  由调用方决定是返回错误还是仅记日志(自动推送场景不应中断主流程)。
48
 
49
- 直接读写原始 JSON(不走 _read/_write_note_json 的版本归一化),避免把
50
  markdown 字符串就地改写成版本数组、抹掉单版本笔记的 model/style 元信息。
51
  """
52
- import json
53
-
54
  # 延迟导入避免与 note 路由的循环依赖;note 路由不在模块级 import 本模块
55
- from app.routers.note import NOTE_OUTPUT_DIR, _pick_markdown_version
 
56
 
57
- path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
58
- if not os.path.exists(path):
59
  raise FeishuError(f"笔记不存在或尚未生成完成:{task_id}")
60
- with open(path, "r", encoding="utf-8") as f:
61
- data = json.load(f)
62
 
63
  # _pick_markdown_version 兼容旧 str 与多版本 list 两种格式,仅读取不改写
64
  content = _pick_markdown_version(data.get("markdown"), version_id)
@@ -82,8 +79,7 @@ def push_task_to_feishu(task_id: str, version_id: Optional[str] = None) -> dict:
82
  "pushed_at": datetime.now().isoformat(timespec="seconds"),
83
  }
84
  data["feishu"] = feishu_info
85
- with open(path, "w", encoding="utf-8") as f:
86
- json.dump(data, f, ensure_ascii=False, indent=2)
87
  return feishu_info
88
 
89
 
 
46
  被「手动推送」接口与「生成后自动推送」共用。任何失败都抛 FeishuError,
47
  由调用方决定是返回错误还是仅记日志(自动推送场景不应中断主流程)。
48
 
49
+ 直接读写原始笔记内容(不走 _read/_write_note_json 的版本归一化),避免把
50
  markdown 字符串就地改写成版本数组、抹掉单版本笔记的 model/style 元信息。
51
  """
 
 
52
  # 延迟导入避免与 note 路由的循环依赖;note 路由不在模块级 import 本模块
53
+ from app.routers.note import _pick_markdown_version
54
+ from app.db.note_dao import load_note, save_note
55
 
56
+ data = load_note(task_id)
57
+ if data is None:
58
  raise FeishuError(f"笔记不存在或尚未生成完成:{task_id}")
 
 
59
 
60
  # _pick_markdown_version 兼容旧 str 与多版本 list 两种格式,仅读取不改写
61
  content = _pick_markdown_version(data.get("markdown"), version_id)
 
79
  "pushed_at": datetime.now().isoformat(timespec="seconds"),
80
  }
81
  data["feishu"] = feishu_info
82
+ save_note(task_id, data)
 
83
  return feishu_info
84
 
85
 
backend/app/routers/note.py CHANGED
@@ -10,6 +10,7 @@ from fastapi import APIRouter, HTTPException, BackgroundTasks, UploadFile, File
10
  from pydantic import BaseModel, validator, field_validator
11
  from dataclasses import asdict
12
 
 
13
  from app.db.video_task_dao import get_task_by_video
14
  from app.enmus.exception import NoteErrorEnum
15
  from app.enmus.note_enums import DownloadQuality
@@ -74,9 +75,8 @@ UPLOAD_DIR = "uploads"
74
 
75
 
76
  def save_note_to_file(task_id: str, note):
77
- os.makedirs(NOTE_OUTPUT_DIR, exist_ok=True)
78
- with open(os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json"), "w", encoding="utf-8") as f:
79
- json.dump(asdict(note), f, ensure_ascii=False, indent=2)
80
 
81
 
82
  def _persist_prefetched_transcript(task_id: str, transcript: dict) -> None:
@@ -227,22 +227,17 @@ def _pick_markdown_version(markdown_field, version_id: Optional[str]) -> str:
227
 
228
 
229
  def _read_note_json(task_id: str) -> dict:
230
- """读笔记 JSON 文件,把 markdown 字段就地归一化成版本数组。文件不存在抛 HTTPException。"""
231
- path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
232
- if not os.path.exists(path):
233
  raise HTTPException(status_code=404, detail="笔记不存在")
234
- with open(path, "r", encoding="utf-8") as f:
235
- data = json.load(f)
236
  data["markdown"] = _normalize_versions(data.get("markdown"))
237
  return data
238
 
239
 
240
  def _write_note_json(task_id: str, data: dict) -> None:
241
- """写回笔记 JSON 文件。markdown 一定是 list 形式。"""
242
- path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
243
- os.makedirs(NOTE_OUTPUT_DIR, exist_ok=True)
244
- with open(path, "w", encoding="utf-8") as f:
245
- json.dump(data, f, ensure_ascii=False, indent=2)
246
 
247
 
248
  def _append_version(
@@ -284,13 +279,10 @@ def export_note(task_id: str, format: str = "markdown", version_id: Optional[str
284
  - format: markdown / pdf / html / word / docx(image / png 暂不支持)
285
  - version_id: 多版本时指定某一版;不传取最新版(v1 单版兼容旧 str 格式)
286
  """
287
- note_path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
288
- if not os.path.exists(note_path):
289
  raise HTTPException(status_code=404, detail="笔记不存在")
290
 
291
- with open(note_path, "r", encoding="utf-8") as f:
292
- note = json.load(f)
293
-
294
  content = _pick_markdown_version(note.get("markdown"), version_id)
295
  if not content.strip():
296
  raise HTTPException(status_code=404, detail="笔记内容为空")
@@ -516,14 +508,12 @@ def generate_note(data: VideoRequest, background_tasks: BackgroundTasks):
516
 
517
  @router.get("/task_status/{task_id}")
518
  def get_task_status(task_id: str):
519
- status_path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.status.json")
520
- result_path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
521
-
522
- # 优先读状态文件
523
- if os.path.exists(status_path):
524
- with open(status_path, "r", encoding="utf-8") as f:
525
- status_content = json.load(f)
526
 
 
 
527
  status = status_content.get("status")
528
  message = status_content.get("message", "")
529
  paused = bool(status_content.get("paused", False))
@@ -531,9 +521,7 @@ def get_task_status(task_id: str):
531
 
532
  if status == TaskStatus.SUCCESS.value:
533
  # 成功状态的话,继续读取最终笔记内容
534
- if os.path.exists(result_path):
535
- with open(result_path, "r", encoding="utf-8") as rf:
536
- result_content = json.load(rf)
537
  return R.success({
538
  "status": status,
539
  "result": result_content,
@@ -562,10 +550,8 @@ def get_task_status(task_id: str):
562
  "task_id": task_id
563
  })
564
 
565
- # 没有状态文件,但有结果
566
- if os.path.exists(result_path):
567
- with open(result_path, "r", encoding="utf-8") as f:
568
- result_content = json.load(f)
569
  return R.success({
570
  "status": TaskStatus.SUCCESS.value,
571
  "result": result_content,
 
10
  from pydantic import BaseModel, validator, field_validator
11
  from dataclasses import asdict
12
 
13
+ from app.db.note_dao import save_note, load_note, get_status
14
  from app.db.video_task_dao import get_task_by_video
15
  from app.enmus.exception import NoteErrorEnum
16
  from app.enmus.note_enums import DownloadQuality
 
75
 
76
 
77
  def save_note_to_file(task_id: str, note):
78
+ # 笔记正文持久化到数据库 notes 表(原先写 note_results/{task_id}.json)。
79
+ save_note(task_id, asdict(note))
 
80
 
81
 
82
  def _persist_prefetched_transcript(task_id: str, transcript: dict) -> None:
 
227
 
228
 
229
  def _read_note_json(task_id: str) -> dict:
230
+ """读笔记,把 markdown 字段就地归一化成版本数组。不存在抛 HTTPException。"""
231
+ data = load_note(task_id)
232
+ if data is None:
233
  raise HTTPException(status_code=404, detail="笔记不存在")
 
 
234
  data["markdown"] = _normalize_versions(data.get("markdown"))
235
  return data
236
 
237
 
238
  def _write_note_json(task_id: str, data: dict) -> None:
239
+ """写回笔记到数据库 notes 。markdown 一定是 list 形式。"""
240
+ save_note(task_id, data)
 
 
 
241
 
242
 
243
  def _append_version(
 
279
  - format: markdown / pdf / html / word / docx(image / png 暂不支持)
280
  - version_id: 多版本时指定某一版;不传取最新版(v1 单版兼容旧 str 格式)
281
  """
282
+ note = load_note(task_id)
283
+ if note is None:
284
  raise HTTPException(status_code=404, detail="笔记不存在")
285
 
 
 
 
286
  content = _pick_markdown_version(note.get("markdown"), version_id)
287
  if not content.strip():
288
  raise HTTPException(status_code=404, detail="笔记内容为空")
 
508
 
509
  @router.get("/task_status/{task_id}")
510
  def get_task_status(task_id: str):
511
+ # 状态与结果都从数据库 notes 表读(原先读 {task_id}.status.json / {task_id}.json)
512
+ status_content = get_status(task_id)
513
+ result_content = load_note(task_id)
 
 
 
 
514
 
515
+ # 优先读状态
516
+ if status_content:
517
  status = status_content.get("status")
518
  message = status_content.get("message", "")
519
  paused = bool(status_content.get("paused", False))
 
521
 
522
  if status == TaskStatus.SUCCESS.value:
523
  # 成功状态的话,继续读取最终笔记内容
524
+ if result_content is not None:
 
 
525
  return R.success({
526
  "status": status,
527
  "result": result_content,
 
550
  "task_id": task_id
551
  })
552
 
553
+ # 没有状态,但有结果
554
+ if result_content is not None:
 
 
555
  return R.success({
556
  "status": TaskStatus.SUCCESS.value,
557
  "result": result_content,
backend/app/services/article.py CHANGED
@@ -10,6 +10,7 @@ from app.article_fetchers.base import ArticleContent, ArticleFetcher
10
  from app.article_fetchers.generic import GenericArticleFetcher
11
  from app.article_fetchers.wechat import WechatArticleFetcher
12
  from app.article_fetchers.xiaohongshu import XiaohongshuArticleFetcher
 
13
  from app.db.article_dao import (
14
  create_subscription,
15
  get_article_item,
@@ -284,12 +285,8 @@ class ArticleService:
284
  def _content_from_note_result(self, task_id: str) -> str:
285
  if not task_id:
286
  return ""
287
- result_path = _note_output_dir() / f"{task_id}.json"
288
- if not result_path.exists():
289
- return ""
290
- try:
291
- payload = json.loads(result_path.read_text(encoding="utf-8"))
292
- except Exception:
293
  return ""
294
  transcript = payload.get("transcript") or {}
295
  return str(transcript.get("full_text") or "").strip()
@@ -366,17 +363,10 @@ class ArticleService:
366
  },
367
  "total_tokens": total_tokens,
368
  }
369
- (_note_output_dir() / f"{task_id}.json").write_text(
370
- json.dumps(payload, ensure_ascii=False, indent=2),
371
- encoding="utf-8",
372
- )
373
 
374
  def _update_status(self, task_id: str, status: TaskStatus) -> None:
375
- payload = {"status": status.value, "paused": False}
376
- (_note_output_dir() / f"{task_id}.status.json").write_text(
377
- json.dumps(payload, ensure_ascii=False, indent=2),
378
- encoding="utf-8",
379
- )
380
 
381
  def _index_task(self, task_id: str) -> None:
382
  try:
 
10
  from app.article_fetchers.generic import GenericArticleFetcher
11
  from app.article_fetchers.wechat import WechatArticleFetcher
12
  from app.article_fetchers.xiaohongshu import XiaohongshuArticleFetcher
13
+ from app.db.note_dao import load_note, save_note, set_status
14
  from app.db.article_dao import (
15
  create_subscription,
16
  get_article_item,
 
285
  def _content_from_note_result(self, task_id: str) -> str:
286
  if not task_id:
287
  return ""
288
+ payload = load_note(task_id)
289
+ if not payload:
 
 
 
 
290
  return ""
291
  transcript = payload.get("transcript") or {}
292
  return str(transcript.get("full_text") or "").strip()
 
363
  },
364
  "total_tokens": total_tokens,
365
  }
366
+ save_note(task_id, payload)
 
 
 
367
 
368
  def _update_status(self, task_id: str, status: TaskStatus) -> None:
369
+ set_status(task_id, {"status": status.value, "paused": False})
 
 
 
 
370
 
371
  def _index_task(self, task_id: str) -> None:
372
  try:
backend/app/services/chat_service.py CHANGED
@@ -8,6 +8,7 @@ from app.models.model_config import ModelConfig
8
  from app.services.provider import ProviderService
9
  from app.services.vector_store import VectorStoreManager, NOTE_OUTPUT_DIR
10
  from app.services.chat_tools import TOOLS, execute_tool
 
11
  from app.utils.logger import get_logger
12
 
13
  logger = get_logger(__name__)
@@ -15,13 +16,8 @@ logger = get_logger(__name__)
15
 
16
  def _load_task_brief(task_id: str) -> dict:
17
  """读出某篇笔记的标题/平台/URL,用于源卡片展示。失败返回空 dict。"""
18
- path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
19
- if not os.path.exists(path):
20
- return {}
21
- try:
22
- with open(path, "r", encoding="utf-8") as f:
23
- data = json.load(f)
24
- except Exception:
25
  return {}
26
  am = data.get("audio_meta", {}) or {}
27
  raw = am.get("raw_info", {}) or {}
 
8
  from app.services.provider import ProviderService
9
  from app.services.vector_store import VectorStoreManager, NOTE_OUTPUT_DIR
10
  from app.services.chat_tools import TOOLS, execute_tool
11
+ from app.db.note_dao import load_note
12
  from app.utils.logger import get_logger
13
 
14
  logger = get_logger(__name__)
 
16
 
17
  def _load_task_brief(task_id: str) -> dict:
18
  """读出某篇笔记的标题/平台/URL,用于源卡片展示。失败返回空 dict。"""
19
+ data = load_note(task_id)
20
+ if not data:
 
 
 
 
 
21
  return {}
22
  am = data.get("audio_meta", {}) or {}
23
  raw = am.get("raw_info", {}) or {}
backend/app/services/chat_tools.py CHANGED
@@ -15,11 +15,8 @@ NOTE_OUTPUT_DIR = os.getenv("NOTE_OUTPUT_DIR", "note_results")
15
 
16
 
17
  def _load_note_data(task_id: str) -> Optional[dict]:
18
- path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
19
- if not os.path.exists(path):
20
- return None
21
- with open(path, "r", encoding="utf-8") as f:
22
- return json.load(f)
23
 
24
 
25
  # ── 工具定义(OpenAI function calling 格式)──────────────────────
 
15
 
16
 
17
  def _load_note_data(task_id: str) -> Optional[dict]:
18
+ from app.db.note_dao import load_note
19
+ return load_note(task_id)
 
 
 
20
 
21
 
22
  # ── 工具定义(OpenAI function calling 格式)──────────────────────
backend/app/services/cookie_manager.py CHANGED
@@ -1,25 +1,21 @@
1
- import json
2
- from pathlib import Path
3
  from typing import Optional, Dict
4
 
 
 
5
 
6
  class CookieConfigManager:
 
 
 
 
7
  def __init__(self, filepath: str = "config/downloader.json"):
8
- self.path = Path(filepath)
9
- self.path.parent.mkdir(parents=True, exist_ok=True)
10
- if not self.path.exists():
11
- self._write({})
12
 
13
  def _read(self) -> Dict[str, Dict[str, str]]:
14
- try:
15
- with self.path.open("r", encoding="utf-8") as f:
16
- return json.load(f)
17
- except Exception:
18
- return {}
19
 
20
  def _write(self, data: Dict[str, Dict[str, str]]):
21
- with self.path.open("w", encoding="utf-8") as f:
22
- json.dump(data, f, ensure_ascii=False, indent=2)
23
 
24
  def get(self, platform: str) -> Optional[str]:
25
  data = self._read()
 
 
 
1
  from typing import Optional, Dict
2
 
3
+ from app.db.app_config_dao import load_value, set_value
4
+
5
 
6
  class CookieConfigManager:
7
+ # 平台 cookie 持久化在数据库 app_config 表(key="downloader");
8
+ # filepath 仅用于把旧的 config/downloader.json 一次性导入。
9
+ _KEY = "downloader"
10
+
11
  def __init__(self, filepath: str = "config/downloader.json"):
12
+ self._legacy_path = filepath
 
 
 
13
 
14
  def _read(self) -> Dict[str, Dict[str, str]]:
15
+ return load_value(self._KEY, self._legacy_path, {}) or {}
 
 
 
 
16
 
17
  def _write(self, data: Dict[str, Dict[str, str]]):
18
+ set_value(self._KEY, data)
 
19
 
20
  def get(self, platform: str) -> Optional[str]:
21
  data = self._read()
backend/app/services/custom_platform_manager.py CHANGED
@@ -6,34 +6,29 @@
6
  - name: 展示名。
7
  - match: URL 子串匹配。如 "vimeo.com"。命中即视为该平台。
8
  """
9
- import json
10
  import re
11
- from pathlib import Path
12
  from typing import Optional
13
 
 
14
 
15
- _PATH = Path("config/custom_platforms.json")
 
 
 
 
16
  _KEY_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{1,31}$")
17
 
18
 
19
  def _read() -> list[dict]:
20
- if not _PATH.exists():
21
- return []
22
- try:
23
- data = json.loads(_PATH.read_text(encoding="utf-8"))
24
- if isinstance(data, dict):
25
- data = data.get("platforms", [])
26
- return [p for p in (data or []) if isinstance(p, dict) and p.get("key")]
27
- except Exception:
28
- return []
29
 
30
 
31
  def _write(items: list[dict]) -> None:
32
- _PATH.parent.mkdir(parents=True, exist_ok=True)
33
- _PATH.write_text(
34
- json.dumps({"platforms": items}, ensure_ascii=False, indent=2),
35
- encoding="utf-8",
36
- )
37
 
38
 
39
  def list_all() -> list[dict]:
 
6
  - name: 展示名。
7
  - match: URL 子串匹配。如 "vimeo.com"。命中即视为该平台。
8
  """
 
9
  import re
 
10
  from typing import Optional
11
 
12
+ from app.db.app_config_dao import load_value, set_value
13
 
14
+
15
+ # 自定义平台持久化在数据库 app_config 表(key="custom_platforms",value 是列表)。
16
+ # _LEGACY_PATH 仅用于把旧的 config/custom_platforms.json 一次性导入。
17
+ _KEY = "custom_platforms"
18
+ _LEGACY_PATH = "config/custom_platforms.json"
19
  _KEY_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{1,31}$")
20
 
21
 
22
  def _read() -> list[dict]:
23
+ data = load_value(_KEY, _LEGACY_PATH, [])
24
+ # 兼容旧文件 {"platforms": [...]} 的形态(导入后首次读到的就是这个 dict)。
25
+ if isinstance(data, dict):
26
+ data = data.get("platforms", [])
27
+ return [p for p in (data or []) if isinstance(p, dict) and p.get("key")]
 
 
 
 
28
 
29
 
30
  def _write(items: list[dict]) -> None:
31
+ set_value(_KEY, items)
 
 
 
 
32
 
33
 
34
  def list_all() -> list[dict]:
backend/app/services/feishu_config_manager.py CHANGED
@@ -1,8 +1,8 @@
1
- import json
2
  import os
3
- from pathlib import Path
4
  from typing import Any, Dict, Optional
5
 
 
 
6
  # 飞书 / Lark 开放平台默认域名。海外租户用 open.larksuite.com,
7
  # 国内租户用 open.feishu.cn(默认)。用户可在设置页切换。
8
  DEFAULT_FEISHU_BASE_URL = "https://open.feishu.cn"
@@ -16,22 +16,18 @@ class FeishuConfigManager:
16
  返回给前端时通过 get_public_config() 隐去明文。
17
  """
18
 
 
 
 
 
19
  def __init__(self, filepath: str = "config/feishu.json"):
20
- self.path = Path(filepath)
21
- self.path.parent.mkdir(parents=True, exist_ok=True)
22
 
23
  def _read(self) -> Dict[str, Any]:
24
- if not self.path.exists():
25
- return {}
26
- try:
27
- with self.path.open("r", encoding="utf-8") as f:
28
- return json.load(f)
29
- except Exception:
30
- return {}
31
 
32
  def _write(self, data: Dict[str, Any]):
33
- with self.path.open("w", encoding="utf-8") as f:
34
- json.dump(data, f, ensure_ascii=False, indent=2)
35
 
36
  def get_config(self) -> Dict[str, Any]:
37
  """内部使用:含 app_secret 明文。"""
 
 
1
  import os
 
2
  from typing import Any, Dict, Optional
3
 
4
+ from app.db.app_config_dao import load_value, set_value
5
+
6
  # 飞书 / Lark 开放平台默认域名。海外租户用 open.larksuite.com,
7
  # 国内租户用 open.feishu.cn(默认)。用户可在设置页切换。
8
  DEFAULT_FEISHU_BASE_URL = "https://open.feishu.cn"
 
16
  返回给前端时通过 get_public_config() 隐去明文。
17
  """
18
 
19
+ # 配置现在持久化在数据库 app_config 表(key="feishu")。filepath 仅用于把旧的
20
+ # config/feishu.json 一次性导入数据库,保证桌面端/本地库的既有配置不丢。
21
+ _KEY = "feishu"
22
+
23
  def __init__(self, filepath: str = "config/feishu.json"):
24
+ self._legacy_path = filepath
 
25
 
26
  def _read(self) -> Dict[str, Any]:
27
+ return load_value(self._KEY, self._legacy_path, {}) or {}
 
 
 
 
 
 
28
 
29
  def _write(self, data: Dict[str, Any]):
30
+ set_value(self._KEY, data)
 
31
 
32
  def get_config(self) -> Dict[str, Any]:
33
  """内部使用:含 app_secret 明文。"""
backend/app/services/note.py CHANGED
@@ -15,6 +15,7 @@ from app.downloaders.bilibili_downloader import BilibiliDownloader
15
  from app.downloaders.douyin_downloader import DouyinDownloader
16
  from app.downloaders.local_downloader import LocalDownloader
17
  from app.downloaders.youtube_downloader import YoutubeDownloader
 
18
  from app.db.video_task_dao import delete_task_by_video, insert_video_task
19
  from app.enmus.exception import NoteErrorEnum, ProviderErrorEnum
20
  from app.enmus.task_status_enums import TaskStatus
@@ -511,9 +512,7 @@ class NoteGenerator:
511
  if not task_id:
512
  return
513
 
514
- NOTE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
515
- status_file = NOTE_OUTPUT_DIR / f"{task_id}.status.json"
516
- print(f"写入状态文件: {status_file} 当前状态: {status} paused={paused} cache={cache}")
517
  data = {"status": status.value if isinstance(status, TaskStatus) else status, "paused": paused}
518
  if message:
519
  data["message"] = message
@@ -521,25 +520,9 @@ class NoteGenerator:
521
  data["cache"] = cache
522
 
523
  try:
524
- # First create a temporary file
525
- temp_file = status_file.with_suffix('.tmp')
526
-
527
- # Write to temporary file
528
- with temp_file.open('w', encoding='utf-8') as f:
529
- json.dump(data, f, ensure_ascii=False, indent=2)
530
-
531
- # Atomic rename operation
532
- temp_file.replace(status_file)
533
-
534
- print(f"状态文件写入成功: {status_file}")
535
  except Exception as e:
536
- logger.error(f"写入状态文件失败 (task_id={task_id}):{e}")
537
- # Try to write error to file directly as fallback
538
- try:
539
- with status_file.open('w', encoding='utf-8') as f:
540
- f.write(f"Error writing status: {str(e)}")
541
- except:
542
- logger.error(f"写入错误 {e}")
543
 
544
  def _handle_exception(self, task_id, exc):
545
  logger.error(f"任务异常 (task_id={task_id})", exc_info=True)
@@ -896,13 +879,12 @@ class NoteGenerator:
896
  - 跳过下载、转写、截图替换等重型环节
897
  - 只调 LLM 拿一段新 markdown 文本返回(由路由层负责追加到版本列表)
898
  """
899
- note_path = NOTE_OUTPUT_DIR / f"{task_id}.json"
900
- if not note_path.exists():
901
  raise NoteError(
902
  code=NoteErrorEnum.PLATFORM_NOT_SUPPORTED.code,
903
  message=f"笔记不存在:{task_id}",
904
  )
905
- data = json.loads(note_path.read_text(encoding="utf-8"))
906
 
907
  audio_meta_d = data.get("audio_meta") or {}
908
  transcript_d = data.get("transcript") or {}
 
15
  from app.downloaders.douyin_downloader import DouyinDownloader
16
  from app.downloaders.local_downloader import LocalDownloader
17
  from app.downloaders.youtube_downloader import YoutubeDownloader
18
+ from app.db.note_dao import load_note, set_status
19
  from app.db.video_task_dao import delete_task_by_video, insert_video_task
20
  from app.enmus.exception import NoteErrorEnum, ProviderErrorEnum
21
  from app.enmus.task_status_enums import TaskStatus
 
512
  if not task_id:
513
  return
514
 
515
+ # 任务状态持久化到数据库 notes 表的 status 字段(原先写 {task_id}.status.json)。
 
 
516
  data = {"status": status.value if isinstance(status, TaskStatus) else status, "paused": paused}
517
  if message:
518
  data["message"] = message
 
520
  data["cache"] = cache
521
 
522
  try:
523
+ set_status(task_id, data)
 
 
 
 
 
 
 
 
 
 
524
  except Exception as e:
525
+ logger.error(f"写入任务状态失败 (task_id={task_id}):{e}")
 
 
 
 
 
 
526
 
527
  def _handle_exception(self, task_id, exc):
528
  logger.error(f"任务异常 (task_id={task_id})", exc_info=True)
 
879
  - 跳过下载、转写、截图替换等重型环节
880
  - 只调 LLM 拿一段新 markdown 文本返回(由路由层负责追加到版本列表)
881
  """
882
+ data = load_note(task_id)
883
+ if data is None:
884
  raise NoteError(
885
  code=NoteErrorEnum.PLATFORM_NOT_SUPPORTED.code,
886
  message=f"笔记不存在:{task_id}",
887
  )
 
888
 
889
  audio_meta_d = data.get("audio_meta") or {}
890
  transcript_d = data.get("transcript") or {}
backend/app/services/proxy_config_manager.py CHANGED
@@ -1,8 +1,8 @@
1
- import json
2
  import os
3
- from pathlib import Path
4
  from typing import Any, Dict, Optional
5
 
 
 
6
 
7
  class ProxyConfigManager:
8
  """全局代理配置,存 JSON 文件,支持前端动态修改。
@@ -12,22 +12,17 @@ class ProxyConfigManager:
12
  这样桌面端/web 用户在设置页填,docker/服务器部署用环境变量兜底。
13
  """
14
 
 
 
 
15
  def __init__(self, filepath: str = "config/proxy.json"):
16
- self.path = Path(filepath)
17
- self.path.parent.mkdir(parents=True, exist_ok=True)
18
 
19
  def _read(self) -> Dict[str, Any]:
20
- if not self.path.exists():
21
- return {}
22
- try:
23
- with self.path.open("r", encoding="utf-8") as f:
24
- return json.load(f)
25
- except Exception:
26
- return {}
27
 
28
  def _write(self, data: Dict[str, Any]):
29
- with self.path.open("w", encoding="utf-8") as f:
30
- json.dump(data, f, ensure_ascii=False, indent=2)
31
 
32
  def get_config(self) -> Dict[str, Any]:
33
  data = self._read()
 
 
1
  import os
 
2
  from typing import Any, Dict, Optional
3
 
4
+ from app.db.app_config_dao import load_value, set_value
5
+
6
 
7
  class ProxyConfigManager:
8
  """全局代理配置,存 JSON 文件,支持前端动态修改。
 
12
  这样桌面端/web 用户在设置页填,docker/服务器部署用环境变量兜底。
13
  """
14
 
15
+ # 配置持久化在数据库 app_config 表(key="proxy");filepath 仅用于旧文件一次性导入。
16
+ _KEY = "proxy"
17
+
18
  def __init__(self, filepath: str = "config/proxy.json"):
19
+ self._legacy_path = filepath
 
20
 
21
  def _read(self) -> Dict[str, Any]:
22
+ return load_value(self._KEY, self._legacy_path, {}) or {}
 
 
 
 
 
 
23
 
24
  def _write(self, data: Dict[str, Any]):
25
+ set_value(self._KEY, data)
 
26
 
27
  def get_config(self) -> Dict[str, Any]:
28
  data = self._read()
backend/app/services/transcriber_config_manager.py CHANGED
@@ -1,28 +1,23 @@
1
- import json
2
  import os
3
- from pathlib import Path
4
  from typing import Optional, Dict, Any
5
 
 
 
6
 
7
  class TranscriberConfigManager:
8
- """管理转写器配置,存储JSON 文件中,支持前端动态修改。"""
 
 
 
9
 
10
  def __init__(self, filepath: str = "config/transcriber.json"):
11
- self.path = Path(filepath)
12
- self.path.parent.mkdir(parents=True, exist_ok=True)
13
 
14
  def _read(self) -> Dict[str, Any]:
15
- if not self.path.exists():
16
- return {}
17
- try:
18
- with self.path.open("r", encoding="utf-8") as f:
19
- return json.load(f)
20
- except Exception:
21
- return {}
22
 
23
  def _write(self, data: Dict[str, Any]):
24
- with self.path.open("w", encoding="utf-8") as f:
25
- json.dump(data, f, ensure_ascii=False, indent=2)
26
 
27
  def get_config(self) -> Dict[str, Any]:
28
  """获取当前转写器配置,fallback 到环境变量默认值。
 
 
1
  import os
 
2
  from typing import Optional, Dict, Any
3
 
4
+ from app.db.app_config_dao import load_value, set_value
5
+
6
 
7
  class TranscriberConfigManager:
8
+ """管理转写器配置,持久化数据库 app_config 表(key="transcriber"),支持前端动态修改。"""
9
+
10
+ # filepath 仅用于把旧的 config/transcriber.json 一次性导入数据库。
11
+ _KEY = "transcriber"
12
 
13
  def __init__(self, filepath: str = "config/transcriber.json"):
14
+ self._legacy_path = filepath
 
15
 
16
  def _read(self) -> Dict[str, Any]:
17
+ return load_value(self._KEY, self._legacy_path, {}) or {}
 
 
 
 
 
 
18
 
19
  def _write(self, data: Dict[str, Any]):
20
+ set_value(self._KEY, data)
 
21
 
22
  def get_config(self) -> Dict[str, Any]:
23
  """获取当前转写器配置,fallback 到环境变量默认值。
backend/app/services/vector_store.py CHANGED
@@ -6,6 +6,7 @@ from typing import Optional
6
  import chromadb
7
  from chromadb.config import Settings
8
 
 
9
  from app.utils.logger import get_logger
10
 
11
  logger = get_logger(__name__)
@@ -117,14 +118,11 @@ class VectorStoreManager:
117
 
118
  def index_task(self, task_id: str) -> None:
119
  """读取笔记结果并建立向量索引。"""
120
- result_path = os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json")
121
- if not os.path.exists(result_path):
122
- logger.warning(f"笔记文件不存在,跳过索引: {result_path}")
123
  return
124
 
125
- with open(result_path, "r", encoding="utf-8") as f:
126
- note_data = json.load(f)
127
-
128
  markdown = note_data.get("markdown", "")
129
  transcript = note_data.get("transcript", {})
130
  segments = transcript.get("segments", [])
 
6
  import chromadb
7
  from chromadb.config import Settings
8
 
9
+ from app.db.note_dao import load_note
10
  from app.utils.logger import get_logger
11
 
12
  logger = get_logger(__name__)
 
118
 
119
  def index_task(self, task_id: str) -> None:
120
  """读取笔记结果并建立向量索引。"""
121
+ note_data = load_note(task_id)
122
+ if note_data is None:
123
+ logger.warning(f"笔记不存在,跳过索引: {task_id}")
124
  return
125
 
 
 
 
126
  markdown = note_data.get("markdown", "")
127
  transcript = note_data.get("transcript", {})
128
  segments = transcript.get("segments", [])
backend/tests/conftest.py ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import tempfile
3
+
4
+ # 把数据库与笔记输出目录强制指向临时位置:
5
+ # - 避免测试污染真实的 video_memo.db / note_results(配置/笔记已迁入数据库后,
6
+ # 管理器与 note_dao 的读写都走全局 engine,不隔离会写进开发库)。
7
+ # - 顺带建好 app_config / notes 等新表,供依赖数据库的单测使用。
8
+ # 必须在任何 app 模块导入前设置:pytest 最先加载 conftest,此处 import app.db.* 时
9
+ # engine 才会按这里的 DATABASE_URL 初始化。load_dotenv(override=False) 不会覆盖已设的值。
10
+ _TEST_DIR = tempfile.mkdtemp(prefix="videomemo-test-")
11
+ os.environ["DATABASE_URL"] = "sqlite:///" + os.path.join(_TEST_DIR, "test.db")
12
+ os.environ.setdefault("NOTE_OUTPUT_DIR", os.path.join(_TEST_DIR, "note_results"))
13
+
14
+ from app.db.init_db import init_db # noqa: E402
15
+
16
+ init_db()
backend/tests/test_article_service.py CHANGED
@@ -99,9 +99,12 @@ def test_generate_from_url_saves_note_json(tmp_path, monkeypatch):
99
  task_id="task-1",
100
  )
101
 
102
- saved = json.loads((tmp_path / "notes" / "task-1.json").read_text(encoding="utf-8"))
103
- status = json.loads((tmp_path / "notes" / "task-1.status.json").read_text(encoding="utf-8"))
 
 
104
  assert result["task_id"] == "task-1"
 
105
  assert saved["markdown"] == "# 总结\n\n- 要点"
106
  assert saved["transcript"]["full_text"] == "正文内容"
107
  assert saved["audio_meta"]["title"] == "文章标题"
 
99
  task_id="task-1",
100
  )
101
 
102
+ # 笔记正文与状态现在存数据库 notes 表(原先是 note_results/{task_id}.json 文件)
103
+ import app.db.note_dao as note_dao
104
+ saved = note_dao.load_note("task-1")
105
+ status = note_dao.get_status("task-1")
106
  assert result["task_id"] == "task-1"
107
+ assert saved is not None
108
  assert saved["markdown"] == "# 总结\n\n- 要点"
109
  assert saved["transcript"]["full_text"] == "正文内容"
110
  assert saved["audio_meta"]["title"] == "文章标题"