import os import sys import tarfile from huggingface_hub import HfApi, hf_hub_download api = HfApi() repo_id = os.getenv("HF_DATASET") token = os.getenv("HF_TOKEN") FILENAME = "latest_backup.tar.gz" STATE_DIR = "/root/.openclaw" # 只备份/恢复这些数据目录(不含 openclaw.json,让 config-generator 重新生成) DATA_DIRS = [ "agents", "workspace", "sessions", "credentials", "backups", "skills", "logs", "openclaw-weixin", "identity", "media", "plugins", "plugin-skills", "tasks", "npm", ".cc-switch", ] # 单独文件也需要保留 DATA_FILES = [ "agent_bindings.json", "update-check.json", ] def restore(): try: if not repo_id or not token: print("[Sync] Skip Restore: HF_DATASET or HF_TOKEN not set") return print(f"[Sync] Downloading {FILENAME} from {repo_id}...") path = hf_hub_download( repo_id=repo_id, filename=FILENAME, repo_type="dataset", token=token, force_download=True ) print(f"[Sync] Selective restore: only data dirs, skip openclaw.json") extracted = 0 skipped = 0 with tarfile.open(path, "r:gz") as tar: for member in tar.getmembers(): name = member.name # 跳过根级别的 openclaw.json(让 config-generator 重新生成) if name == "openclaw.json": skipped += 1 continue # 检查是否属于数据目录或数据文件 allowed = False for d in DATA_DIRS: if name == d or name.startswith(d + "/"): allowed = True break if not allowed: for f in DATA_FILES: if name == f: allowed = True break if allowed: tar.extract(member, path=STATE_DIR) extracted += 1 else: skipped += 1 print(f"[Sync] Restore done: extracted {extracted}, skipped {skipped}") # 验证关键文件 for check_dir, label in [ ("agents/agent_1/sessions", "agent_1 sessions"), ("sessions", "root sessions"), ("workspace", "workspace"), ("openclaw-weixin", "weixin data"), ]: full = os.path.join(STATE_DIR, check_dir) if os.path.exists(full): count = len([f for f in os.listdir(full) if os.path.isfile(os.path.join(full, f))]) print(f"[Sync] {label}: {count} files") else: print(f"[Sync] {label}: NOT FOUND") memory_file = os.path.join(STATE_DIR, "workspace", "MEMORY.md") if os.path.exists(memory_file): size = os.path.getsize(memory_file) print(f"[Sync] MEMORY.md: {size} bytes") else: print(f"[Sync] MEMORY.md: NOT FOUND") return True except Exception as e: print(f"[Sync] Restore note: No existing backup or error: {e}") return False def backup(): try: if not repo_id or not token: print("[Sync] Skip Backup: HF_DATASET or HF_TOKEN not set") return print(f"[Sync] Backing up data from {STATE_DIR}...") file_count = 0 with tarfile.open(FILENAME, "w:gz") as tar: # 备份数据目录 for d in DATA_DIRS: full = os.path.join(STATE_DIR, d) if os.path.exists(full): tar.add(full, arcname=d) file_count += 1 # 备份数据文件 for f in DATA_FILES: full = os.path.join(STATE_DIR, f) if os.path.exists(full): tar.add(full, arcname=f) file_count += 1 size = os.path.getsize(FILENAME) print(f"[Sync] Archive: {file_count} items, {size // 1024} KB") api.upload_file( path_or_fileobj=FILENAME, path_in_repo=FILENAME, repo_id=repo_id, repo_type="dataset", token=token ) print(f"[Sync] Backup uploaded to {repo_id}/{FILENAME}") except Exception as e: print(f"[Sync] Backup error: {e}") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "backup": backup() else: restore()