| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | import os |
| | import shutil |
| | import subprocess |
| | import time |
| | import unittest |
| |
|
| | from requests.exceptions import HTTPError |
| | from transformers.hf_api import HfApi, HfFolder, ModelInfo, RepoObj |
| | from transformers.testing_utils import ENDPOINT_STAGING, PASS, USER, is_staging_test, require_git_lfs |
| |
|
| |
|
| | ENDPOINT_STAGING_BASIC_AUTH = f"https://{USER}:{PASS}@moon-staging.huggingface.co" |
| |
|
| | REPO_NAME = f"my-model-{int(time.time())}" |
| | REPO_NAME_LARGE_FILE = f"my-model-largefiles-{int(time.time())}" |
| | WORKING_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures/working_repo") |
| | LARGE_FILE_14MB = "https://cdn-media.huggingface.co/lfs-largefiles/progit.epub" |
| | LARGE_FILE_18MB = "https://cdn-media.huggingface.co/lfs-largefiles/progit.pdf" |
| |
|
| |
|
| | class HfApiCommonTest(unittest.TestCase): |
| | _api = HfApi(endpoint=ENDPOINT_STAGING) |
| |
|
| |
|
| | class HfApiLoginTest(HfApiCommonTest): |
| | def test_login_invalid(self): |
| | with self.assertRaises(HTTPError): |
| | self._api.login(username=USER, password="fake") |
| |
|
| | def test_login_valid(self): |
| | token = self._api.login(username=USER, password=PASS) |
| | self.assertIsInstance(token, str) |
| |
|
| |
|
| | class HfApiEndpointsTest(HfApiCommonTest): |
| | @classmethod |
| | def setUpClass(cls): |
| | """ |
| | Share this valid token in all tests below. |
| | """ |
| | cls._token = cls._api.login(username=USER, password=PASS) |
| |
|
| | def test_whoami(self): |
| | user, orgs = self._api.whoami(token=self._token) |
| | self.assertEqual(user, USER) |
| | self.assertIsInstance(orgs, list) |
| |
|
| | def test_list_repos_objs(self): |
| | objs = self._api.list_repos_objs(token=self._token) |
| | self.assertIsInstance(objs, list) |
| | if len(objs) > 0: |
| | o = objs[-1] |
| | self.assertIsInstance(o, RepoObj) |
| |
|
| | def test_create_and_delete_repo(self): |
| | self._api.create_repo(token=self._token, name=REPO_NAME) |
| | self._api.delete_repo(token=self._token, name=REPO_NAME) |
| |
|
| |
|
| | class HfApiPublicTest(unittest.TestCase): |
| | def test_staging_model_list(self): |
| | _api = HfApi(endpoint=ENDPOINT_STAGING) |
| | _ = _api.model_list() |
| |
|
| | def test_model_list(self): |
| | _api = HfApi() |
| | models = _api.model_list() |
| | self.assertGreater(len(models), 100) |
| | self.assertIsInstance(models[0], ModelInfo) |
| |
|
| |
|
| | class HfFolderTest(unittest.TestCase): |
| | def test_token_workflow(self): |
| | """ |
| | Test the whole token save/get/delete workflow, |
| | with the desired behavior with respect to non-existent tokens. |
| | """ |
| | token = f"token-{int(time.time())}" |
| | HfFolder.save_token(token) |
| | self.assertEqual(HfFolder.get_token(), token) |
| | HfFolder.delete_token() |
| | HfFolder.delete_token() |
| | |
| | |
| | self.assertEqual(HfFolder.get_token(), None) |
| |
|
| |
|
| | @require_git_lfs |
| | @is_staging_test |
| | class HfLargefilesTest(HfApiCommonTest): |
| | @classmethod |
| | def setUpClass(cls): |
| | """ |
| | Share this valid token in all tests below. |
| | """ |
| | cls._token = cls._api.login(username=USER, password=PASS) |
| |
|
| | def setUp(self): |
| | try: |
| | shutil.rmtree(WORKING_REPO_DIR) |
| | except FileNotFoundError: |
| | pass |
| |
|
| | def tearDown(self): |
| | self._api.delete_repo(token=self._token, name=REPO_NAME_LARGE_FILE) |
| |
|
| | def setup_local_clone(self, REMOTE_URL): |
| | REMOTE_URL_AUTH = REMOTE_URL.replace(ENDPOINT_STAGING, ENDPOINT_STAGING_BASIC_AUTH) |
| | subprocess.run(["git", "clone", REMOTE_URL_AUTH, WORKING_REPO_DIR], check=True, capture_output=True) |
| | subprocess.run(["git", "lfs", "track", "*.pdf"], check=True, cwd=WORKING_REPO_DIR) |
| | subprocess.run(["git", "lfs", "track", "*.epub"], check=True, cwd=WORKING_REPO_DIR) |
| |
|
| | def test_end_to_end_thresh_6M(self): |
| | REMOTE_URL = self._api.create_repo( |
| | token=self._token, name=REPO_NAME_LARGE_FILE, lfsmultipartthresh=6 * 10 ** 6 |
| | ) |
| | self.setup_local_clone(REMOTE_URL) |
| |
|
| | subprocess.run(["wget", LARGE_FILE_18MB], check=True, capture_output=True, cwd=WORKING_REPO_DIR) |
| | subprocess.run(["git", "add", "*"], check=True, cwd=WORKING_REPO_DIR) |
| | subprocess.run(["git", "commit", "-m", "commit message"], check=True, cwd=WORKING_REPO_DIR) |
| |
|
| | |
| | failed_process = subprocess.run(["git", "push"], capture_output=True, cwd=WORKING_REPO_DIR) |
| | self.assertEqual(failed_process.returncode, 1) |
| | self.assertIn("transformers-cli lfs-enable-largefiles", failed_process.stderr.decode()) |
| | |
| |
|
| | subprocess.run(["transformers-cli", "lfs-enable-largefiles", WORKING_REPO_DIR], check=True) |
| |
|
| | start_time = time.time() |
| | subprocess.run(["git", "push"], check=True, cwd=WORKING_REPO_DIR) |
| | print("took", time.time() - start_time) |
| |
|
| | |
| | pdf_url = f"{REMOTE_URL}/resolve/main/progit.pdf" |
| | DEST_FILENAME = "uploaded.pdf" |
| | subprocess.run(["wget", pdf_url, "-O", DEST_FILENAME], check=True, capture_output=True, cwd=WORKING_REPO_DIR) |
| | dest_filesize = os.stat(os.path.join(WORKING_REPO_DIR, DEST_FILENAME)).st_size |
| | self.assertEqual(dest_filesize, 18685041) |
| |
|
| | def test_end_to_end_thresh_16M(self): |
| | |
| | REMOTE_URL = self._api.create_repo( |
| | token=self._token, name=REPO_NAME_LARGE_FILE, lfsmultipartthresh=16 * 10 ** 6 |
| | ) |
| | self.setup_local_clone(REMOTE_URL) |
| |
|
| | subprocess.run(["wget", LARGE_FILE_18MB], check=True, capture_output=True, cwd=WORKING_REPO_DIR) |
| | subprocess.run(["wget", LARGE_FILE_14MB], check=True, capture_output=True, cwd=WORKING_REPO_DIR) |
| | subprocess.run(["git", "add", "*"], check=True, cwd=WORKING_REPO_DIR) |
| | subprocess.run(["git", "commit", "-m", "both files in same commit"], check=True, cwd=WORKING_REPO_DIR) |
| |
|
| | subprocess.run(["transformers-cli", "lfs-enable-largefiles", WORKING_REPO_DIR], check=True) |
| |
|
| | start_time = time.time() |
| | subprocess.run(["git", "push"], check=True, cwd=WORKING_REPO_DIR) |
| | print("took", time.time() - start_time) |
| |
|