soarmb / test_sync.py
iyougame's picture
Sync from GitHub: ab4503b6302c95df375b305e464c4e12ec6a9f3c
8c9921f verified
import requests
import json
import sys
# === 配置 ===
URL = "http://127.0.0.1:7860/v1/videos/sync"
KEY = "han1234"
def test_sync():
print(f"🚀 发送同步请求到: {URL}")
try:
response = requests.post(
URL,
headers={"Authorization": f"Bearer {KEY}"},
json={
"limit": 1, # 只同步最新的一条
"stream": True
},
stream=True
)
if response.status_code != 200:
print(f"❌ 请求失败: {response.status_code}\n{response.text}")
return
print("✅ 连接成功,正在同步...\n" + "-"*30)
# 处理流式输出
for line in response.iter_lines():
if not line: continue
line_text = line.decode('utf-8').replace('data: ', '').strip()
if line_text == '' or line_text == '[DONE]':
print("-" * 30 + "\n🏁 同步结束")
break
try:
data = json.loads(line_text)
delta = data['choices'][0]['delta']
if 'reasoning_content' in delta and delta['reasoning_content']:
print(f"🔄 {delta['reasoning_content'].strip()}")
if 'content' in delta and delta['content']:
print(f"\n🎉 结果:\n{delta['content']}")
except json.JSONDecodeError:
pass
except Exception as e:
print(f"❌ 发生错误: {e}")
if __name__ == "__main__":
test_sync()